rubenada commented on a change in pull request #1369: [CALCITE-2979] Add a 
block-based nested loop join algorithm (Khawla Mouhoubi)
URL: https://github.com/apache/calcite/pull/1369#discussion_r315091731
 
 

 ##########
 File path: 
linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
 ##########
 @@ -1280,6 +1280,181 @@ private void closeInner() {
     throw new NoSuchElementException();
   }
 
+  /**
+   * Implementation of the batch nested loop join algorithm.
+   *
+   * It will fetch blocks of size {@code batchSize} from {@code outer},
+   * storing each block into a list ({@code outerValues}).
+   * For each block, it will use the {@code inner} function to
+   * obtain an enumerable with the correlated rows from the right (inner) 
input.
+   *
+   * Each result present in the {@code innerEnumerator} has matched at least 
one
+   * value from the block {@code outerValues}.
+   * At this point we will perform a mini nested loop between the outer values
+   * and inner values using the {@code predicate} to find out the actual 
matching join results.
+   *
+   * In order to optimize this mini nested loop, during the first iteration
+   * (the first value from {@code outerValues}) we use the {@code 
innerEnumerator}
+   * to compare it to inner rows, and at the same time we fill a list ({@code 
innerValues})
+   * with said {@code innerEnumerator} rows. In the subsequent iterations
+   * (2nd, 3rd, etc. value from {@code outerValues}) the list {@code 
innerValues} will be used,
+   * since it contains all the {@code innerEnumerator} values,
+   * which were stored in the first iteration.
+   */
+  public static <TSource, TInner, TResult> Enumerable<TResult> 
correlateBatchJoin(
+      final JoinType joinType,
+      final Enumerable<TSource> outer,
+      final Function1<List<TSource>, Enumerable<TInner>> inner,
+      final Function2<TSource, TInner, TResult> resultSelector,
+      final Predicate2<TSource, TInner> predicate,
+      final int batchSize) {
+    return new AbstractEnumerable<TResult>() {
+      @Override public Enumerator<TResult> enumerator() {
+        return new Enumerator<TResult>() {
+          private Enumerator<TSource> outerEnumerator = outer.enumerator();
+          List<TSource> outerValues = new ArrayList<>();
+          List<TInner> innerValues = new ArrayList<>();
+          Enumerable<TInner> innerEnumerable;
+          Enumerator<TInner> innerEnumerator;
+          boolean innerEnumHasNext = false;
+          TSource outerValue;
+          TInner innerValue;
+          boolean atLeastOneResult = false;
+          int i = -1; // outer position
+          int j = -1; // inner position
+
+          public TResult current() {
+            return resultSelector.apply(outerValue, innerValue);
+          }
+
+          public boolean moveNext() {
+            while (true) {
+              // Fetch a new batch
+              if (i == outerValues.size() || i == -1) {
+                i = 0;
+                j = 0;
+                outerValues.clear();
+                while (outerValues.size() < batchSize && 
outerEnumerator.moveNext()) {
+                  TSource tSource = outerEnumerator.current();
+                  outerValues.add(tSource);
+                }
+                if (outerValues.isEmpty()) {
+                  return false;
+                }
+                innerEnumerable = inner.apply(new AbstractList<TSource>() {
+                  // If the last batch isn't complete fill it with the first 
value
+                  // No harm since it's a disjunction
+                  @Override public TSource get(final int index) {
+                    return index < outerValues.size() ? outerValues.get(index) 
: outerValues.get(0);
+                  }
+                  @Override public int size() {
+                    return batchSize;
+                  }
+                });
+                if (innerEnumerable == null) {
+                  innerEnumerable = Linq4j.emptyEnumerable();
+                }
+                innerEnumerator = innerEnumerable.enumerator();
+                innerEnumHasNext = innerEnumerator.moveNext();
+
+                // If no inner values skip the whole batch
+                // in case of SEMI and INNER join
+                if (!innerEnumHasNext
+                    && (joinType == JoinType.SEMI || joinType == 
JoinType.INNER)) {
+                  i = outerValues.size();
+                  continue;
+                }
+              }
+              if (innerHasNext()) {
+                outerValue = outerValues.get(i); // get current outer value
+                nextInnerValue();
+                // Compare current block row to current inner value
+                if (predicate.apply(outerValue, innerValue)) {
+                  atLeastOneResult = true;
+
+                  // Skip the rest of inner values in case of
+                  // ANTI and SEMI when a match is found
+                  if (joinType == JoinType.ANTI || joinType == JoinType.SEMI) {
+                    // Two ways of skipping inner values,
+                    // enumerator way and ArrayList way
+                    if (i == 0) {
+                      fillList();
+                      innerEnumHasNext = false;
+                    } else {
+                      j = innerValues.size();
+                    }
+                    if (joinType == JoinType.ANTI) {
+                      continue;
+                    }
+                  }
+                  return true;
+                }
+              } else { // End of inner
+                if (!atLeastOneResult
+
+                    && (joinType == JoinType.LEFT
+                    || joinType == JoinType.ANTI)) {
+                  outerValue = outerValues.get(i); // get current outer value
+                  innerValue = null;
+                  nextOuterValue();
+                  return true;
+                }
+                nextOuterValue();
+              }
+            }
+          }
+
+          public void nextOuterValue() {
+            i++; // next outerValue
+            j = 0; // rewind innerValues
+            atLeastOneResult = false;
+          }
+
+          private void nextInnerValue() {
+            if (i == 0) {
+              innerValue = innerEnumerator.current();
+              innerValues.add(innerValue);
+              innerEnumHasNext = innerEnumerator.moveNext(); // next 
enumerator inner value
+            } else {
+              innerValue = innerValues.get(j++); // next ArrayList inner value
+            }
+          }
+
+          private boolean innerHasNext() {
+            return (innerEnumHasNext && i == 0) || (j < innerValues.size() && 
i != 0);
+          }
+
+          private void fillList() {
+            while (innerEnumHasNext) {
+              innerValues.add(innerEnumerator.current());
+              innerEnumHasNext = innerEnumerator.moveNext();
+            }
+          }
+          public void reset() {
+            outerEnumerator.reset();
+            innerValue = null;
+            outerValue = null;
+            outerValues.clear();
+            if (innerEnumerator != null) {
 
 Review comment:
   I think this is not really needed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to