DRILL-6323: Lateral Join - Remove codegen and operator template class. Logic to 
copy data is moved to LateralJoinBatch itself


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5ed31962
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5ed31962
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5ed31962

Branch: refs/heads/master
Commit: 5ed319620b66009f069934bba72f2740ff3cacaf
Parents: 74565cc
Author: Sorabh Hamirwasia <shamirwa...@maprtech.com>
Authored: Fri Mar 9 15:56:22 2018 -0800
Committer: Parth Chandra <par...@apache.org>
Committed: Tue Apr 17 18:15:54 2018 -0700

----------------------------------------------------------------------
 .../exec/physical/impl/join/LateralJoin.java    |  57 ----
 .../physical/impl/join/LateralJoinBatch.java    | 313 ++++++++-----------
 .../physical/impl/join/LateralJoinTemplate.java | 147 ---------
 .../impl/join/TestLateralJoinCorrectness.java   |  67 ++++
 4 files changed, 195 insertions(+), 389 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/5ed31962/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoin.java
deleted file mode 100644
index 723c0ef..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoin.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.join;
-
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.drill.exec.compile.TemplateClassDefinition;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.LateralJoinPOP;
-import org.apache.drill.exec.record.ExpandableHyperContainer;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.VectorContainer;
-
-import java.util.LinkedList;
-
-/**
- * Interface for the lateral join operator.
- */
-public interface LateralJoin {
-  public static TemplateClassDefinition<LateralJoin> TEMPLATE_DEFINITION =
-    new TemplateClassDefinition<>(LateralJoin.class, 
LateralJoinTemplate.class);
-
-  public void setupLateralJoin(FragmentContext context, RecordBatch left,
-                               RecordBatch right, LateralJoinBatch outgoing,
-                               JoinRelType joinType);
-
-  // Produce output records taking into account join type
-  public int crossJoinAndOutputRecords(int leftIndex, int rightIndex);
-
-  public int generateLeftJoinOutput(int leftIndex);
-
-  public void updateOutputIndex(int newOutputIndex);
-
-  // Project the record at offset 'leftIndex' in the left input batch into the 
output container at offset 'outIndex'
-  public void emitLeft(int leftIndex, int outIndex);
-
-  // Project the record from the hyper container given the batch index and the 
record within the batch at 'outIndex'
-  public void emitRight(int rightIndex, int outIndex);
-
-  // Setup the input/output value vector references
-  public void doSetup(FragmentContext context, RecordBatch rightBatch,
-                      RecordBatch leftBatch, RecordBatch outgoing);
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/5ed31962/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 122ff86..f01bf1c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -18,19 +18,11 @@
 package org.apache.drill.exec.physical.impl.join;
 
 import com.google.common.base.Preconditions;
-import com.sun.codemodel.JExpr;
-import com.sun.codemodel.JExpression;
-import com.sun.codemodel.JVar;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.compile.sig.GeneratorMapping;
-import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.LateralContract;
 import org.apache.drill.exec.physical.config.LateralJoinPOP;
@@ -38,12 +30,10 @@ import 
org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.AllocationHelper;
-
-import java.io.IOException;
+import org.apache.drill.exec.vector.ValueVector;
 
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
@@ -74,11 +64,8 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
   // Schema on the right side
   private BatchSchema rightSchema = null;
 
-  // Runtime generated class implementing the LateralJoin interface
-  private LateralJoin lateralJoiner = null;
-
-  // Number of output records in the current outgoing batch
-  private int outputRecords = 0;
+  // Index in output batch to populate next row
+  private int outputIndex = 0;
 
   // Current index of record in left incoming which is being processed
   private int leftJoinIndex = -1;
@@ -92,49 +79,17 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
   // Keep track if any matching right record was found for current left index 
record
   private boolean matchedRecordFound = false;
 
-  private boolean enableLateralCGDebugging = true;
-
-  // Shared Generator mapping for the left/right side : constant
-  private static final GeneratorMapping EMIT_CONSTANT =
-    GeneratorMapping.create("doSetup"/* setup method */,"doSetup" /* eval 
method */,
-      null /* reset */, null /* cleanup */);
-
-  // Generator mapping for the right side
-  private static final GeneratorMapping EMIT_RIGHT =
-    GeneratorMapping.create("doSetup"/* setup method */,"emitRight" /* eval 
method */,
-      null /* reset */,null /* cleanup */);
-
-  // Generator mapping for the left side : scalar
-  private static final GeneratorMapping EMIT_LEFT =
-    GeneratorMapping.create("doSetup" /* setup method */, "emitLeft" /* eval 
method */,
-      null /* reset */, null /* cleanup */);
-
-  // Mapping set for the right side
-  private static final MappingSet emitRightMapping =
-    new MappingSet("rightIndex" /* read index */, "outIndex" /* write index */,
-      "rightBatch" /* read container */,"outgoing" /* write container */,
-      EMIT_CONSTANT, EMIT_RIGHT);
-
-  // Mapping set for the left side
-  private static final MappingSet emitLeftMapping =
-    new MappingSet("leftIndex" /* read index */, "outIndex" /* write index */,
-      "leftBatch" /* read container */,"outgoing" /* write container */,
-      EMIT_CONSTANT, EMIT_LEFT);
-
   protected LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext context,
                              RecordBatch left, RecordBatch right) throws 
OutOfMemoryException {
     super(popConfig, context, left, right);
     Preconditions.checkNotNull(left);
     Preconditions.checkNotNull(right);
-    enableLateralCGDebugging = 
true;//context.getConfig().getBoolean(ExecConstants.ENABLE_CODE_DUMP_DEBUG_LATERAL);
   }
 
   private boolean handleSchemaChange() {
     try {
       stats.startSetup();
       setupNewSchema();
-      lateralJoiner = setupWorker();
-      lateralJoiner.setupLateralJoin(context, left, right, this, 
popConfig.getJoinType());
       return true;
     } catch (SchemaChangeException ex) {
       logger.error("Failed to handle schema change hence killing the query");
@@ -184,7 +139,7 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
             } else {
               leftUpstream = OK;
             }
-          } else if (outputRecords > 0) {
+          } else if (outputIndex > 0) {
             // This means there is already some records from previous join 
inside left batch
             // So we need to pass that downstream and then handle the 
OK_NEW_SCHEMA in subsequent next call
             processLeftBatchInFuture = true;
@@ -225,7 +180,7 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
         case NONE:
         case STOP:
           // Not using =0 since if outgoing container is empty then no point 
returning anything
-          if (outputRecords > 0) {
+          if (outputIndex > 0) {
             processLeftBatchInFuture = true;
           }
           return leftUpstream;
@@ -253,19 +208,18 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
   private IterOutcome processRightBatch() {
     // Check if we still have records left to process in left incoming from 
new batch or previously half processed
     // batch based on indexes. We are making sure to update leftJoinIndex and 
rightJoinIndex correctly. Like for new
-    // batch leftJoinIndex will always be set to zero and once leftSide batch 
is fully processed then
-    // it will be set to -1.
-    // Whereas rightJoinIndex is to keep track of record in right batch being 
joined with
-    // record in left batch. So when there are cases such that all records in 
right batch is not consumed
-    // by the output, then rightJoinIndex will be a valid index. When all 
records are consumed it will be set to -1.
+    // batch leftJoinIndex will always be set to zero and once leftSide batch 
is fully processed then it will be set
+    // to -1.
+    // Whereas rightJoinIndex is to keep track of record in right batch being 
joined with record in left batch.
+    // So when there are cases such that all records in right batch is not 
consumed by the output, then rightJoinIndex
+    // will be a valid index. When all records are consumed it will be set to 
-1.
     boolean needNewRightBatch = (leftJoinIndex >= 0) && (rightJoinIndex == -1);
     while (needNewRightBatch) {
       rightUpstream = next(RIGHT_INPUT, right);
       switch (rightUpstream) {
         case OK_NEW_SCHEMA:
           // We should not get OK_NEW_SCHEMA multiple times for the same left 
incoming batch. So there won't be a
-          // case where we get OK_NEW_SCHEMA --> OK (with batch) ---> 
OK_NEW_SCHEMA --> OK/EMIT
-          // fall through
+          // case where we get OK_NEW_SCHEMA --> OK (with batch) ---> 
OK_NEW_SCHEMA --> OK/EMIT fall through
           //
           // Right batch with OK_NEW_SCHEMA is always going to be an empty 
batch, so let's pass the new schema
           // downstream and later with subsequent next() call the join output 
will be produced
@@ -285,8 +239,8 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
           }
         case OK:
         case EMIT:
-          // Even if there are no records we should not call next() again 
because in case of LEFT join
-          // empty batch is of importance too
+          // Even if there are no records we should not call next() again 
because in case of LEFT join empty batch is
+          // of importance too
           rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1;
           needNewRightBatch = false;
           break;
@@ -351,17 +305,15 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
     // output container based on new left schema and old right schema. If 
schema change failed then return STOP
     // downstream
     if (leftUpstream == OK_NEW_SCHEMA && !handleSchemaChange()) {
-        return STOP;
+      return STOP;
     }
 
     // Setup the references of left, right and outgoing container in generated 
operator
-    if (state == BatchState.FIRST) {
-      lateralJoiner.setupLateralJoin(context, left, right, this, 
popConfig.getJoinType());
-      state = BatchState.NOT_FIRST;
-    }
+    state = BatchState.NOT_FIRST;
 
     // allocate space for the outgoing batch
     allocateVectors();
+
     return produceOutputBatch();
   }
 
@@ -379,27 +331,26 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
     boolean isLeftProcessed = false;
 
     // Try to fully pack the outgoing container
-    while (outputRecords < LateralJoinBatch.MAX_BATCH_SIZE) {
-      int previousOutputCount = outputRecords;
+    while (outputIndex < LateralJoinBatch.MAX_BATCH_SIZE) {
+      final int previousOutputCount = outputIndex;
       // invoke the runtime generated method to emit records in the output 
batch for each leftJoinIndex
-      outputRecords = lateralJoiner.crossJoinAndOutputRecords(leftJoinIndex, 
rightJoinIndex);
+      outputIndex = crossJoinAndOutputRecords(leftJoinIndex, rightJoinIndex, 
outputIndex);
 
       // We have produced some records in outgoing container, hence there must 
be a match found for left record
-      if (outputRecords > previousOutputCount) {
+      if (outputIndex > previousOutputCount) {
+        // Need this extra flag since there can be left join case where for 
current leftJoinIndex it receives a right
+        // batch with data, then an empty batch and again another empty batch 
with EMIT outcome. If we just use
+        // outputIndex then we will loose the information that few rows for 
leftJoinIndex is already produced using
+        // first right batch
         matchedRecordFound = true;
       }
 
-      if (right.getRecordCount() == 0) {
-        rightJoinIndex = -1;
-      } else {
-        // One right batch might span across multiple output batch. So 
rightIndex will be moving sum of all the
-        // output records for this record batch until it's fully consumed.
-        //
-        // Also it can be so that one output batch can contain records from 2 
different right batch hence the
-        // rightJoinIndex should move by number of records in output batch for 
current right batch only.
-        rightJoinIndex += outputRecords - previousOutputCount;
-      }
-
+      // One right batch might span across multiple output batch. So 
rightIndex will be moving sum of all the
+      // output records for this record batch until it's fully consumed.
+      //
+      // Also it can be so that one output batch can contain records from 2 
different right batch hence the
+      // rightJoinIndex should move by number of records in output batch for 
current right batch only.
+      rightJoinIndex += outputIndex - previousOutputCount;
       final boolean isRightProcessed = rightJoinIndex == -1 || rightJoinIndex 
>= right.getRecordCount();
 
       // Check if above join to produce output was based on empty right batch 
or
@@ -408,9 +359,9 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
       // Otherwise it means for the given right batch there is still some 
record left to be processed.
       if (isRightProcessed) {
         if (rightUpstream == EMIT) {
-          if (!matchedRecordFound) {
-            // will only produce left side in case of LEFT join
-            outputRecords = 
lateralJoiner.generateLeftJoinOutput(leftJoinIndex);
+          if (!matchedRecordFound && JoinRelType.LEFT == 
popConfig.getJoinType()) {
+            // copy left side in case of LEFT join
+            emitLeft(leftJoinIndex, outputIndex++);
           }
           ++leftJoinIndex;
           // Reset matchedRecord for next left index record
@@ -430,7 +381,7 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
       }
 
       // Check if output batch still has some space
-      if (outputRecords < MAX_BATCH_SIZE) {
+      if (outputIndex < MAX_BATCH_SIZE) {
         // Check if left side still has records or not
         if (isLeftProcessed) {
           // The left batch was with EMIT/OK_NEW_SCHEMA outcome, then return 
output to downstream layer
@@ -467,9 +418,7 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
         // batch. Now we have got new left batch with OK outcome. Let's get 
next right batch
         //
         // It will not hit OK_NEW_SCHEMA since left side have not seen that 
outcome
-
         rightUpstream = processRightBatch();
-
         Preconditions.checkState(rightUpstream != OK_NEW_SCHEMA, "Unexpected 
schema change in right branch");
 
         if (isTerminalOutcome(rightUpstream)) {
@@ -501,19 +450,15 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
    * Finalizes the current output container with the records produced so far 
before sending it downstream
    */
   private void finalizeOutputContainer() {
-
-    VectorAccessibleUtilities.setValueCount(container, outputRecords);
+    VectorAccessibleUtilities.setValueCount(container, outputIndex);
 
     // Set the record count in the container
-    container.setRecordCount(outputRecords);
+    container.setRecordCount(outputIndex);
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+    logger.debug("Number of records emitted: " + outputIndex);
 
-    logger.debug("Number of records emitted: " + outputRecords);
-
-    // We are about to send the output batch so reset the outputRecords for 
future next call
-    outputRecords = 0;
     // Update the output index for next output batch to zero
-    lateralJoiner.updateOutputIndex(0);
+    outputIndex = 0;
   }
 
   /**
@@ -595,96 +540,12 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
     }
 
     // Let's build schema for the container
-    container.setRecordCount(0);
+    outputIndex = 0;
+    container.setRecordCount(outputIndex);
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
   }
 
   /**
-   * Method generates the runtime code needed for LateralJoin. Other than the 
setup method to set the input and output
-   * value vector references we implement two more methods
-   * 1. emitLeft() -> Copy record from the left side to output container
-   * 2. emitRight() -> Copy record from the right side to output container
-   * @return the runtime generated class that implements the LateralJoin 
interface
-   */
-  private LateralJoin setupWorker() throws SchemaChangeException {
-    final CodeGenerator<LateralJoin> lateralCG = CodeGenerator.get(
-      LateralJoin.TEMPLATE_DEFINITION, context.getOptions());
-    lateralCG.plainJavaCapable(true);
-
-    // To enabled code gen dump for lateral use the setting 
ExecConstants.ENABLE_CODE_DUMP_DEBUG_LATERAL
-    lateralCG.saveCodeForDebugging(enableLateralCGDebugging);
-    final ClassGenerator<LateralJoin> nLJClassGenerator = lateralCG.getRoot();
-
-    // generate emitLeft
-    nLJClassGenerator.setMappingSet(emitLeftMapping);
-    JExpression outIndex = JExpr.direct("outIndex");
-    JExpression leftIndex = JExpr.direct("leftIndex");
-
-    int fieldId = 0;
-    int outputFieldId = 0;
-    if (leftSchema != null) {
-      // Set the input and output value vector references corresponding to the 
left batch
-      for (MaterializedField field : leftSchema) {
-        final TypeProtos.MajorType fieldType = field.getType();
-
-        // Add the vector to the output container
-        container.addOrGet(field);
-
-        JVar inVV = 
nLJClassGenerator.declareVectorValueSetupAndMember("leftBatch",
-            new TypedFieldId(fieldType, false, fieldId));
-        JVar outVV = 
nLJClassGenerator.declareVectorValueSetupAndMember("outgoing",
-            new TypedFieldId(fieldType, false, outputFieldId));
-
-        
nLJClassGenerator.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(leftIndex).arg(outIndex).arg(inVV));
-        nLJClassGenerator.rotateBlock();
-        fieldId++;
-        outputFieldId++;
-      }
-    }
-
-    // generate emitRight
-    fieldId = 0;
-    nLJClassGenerator.setMappingSet(emitRightMapping);
-    JExpression rightIndex = JExpr.direct("rightIndex");
-
-    if (rightSchema != null) {
-      // Set the input and output value vector references corresponding to the 
right batch
-      for (MaterializedField field : rightSchema) {
-
-        final TypeProtos.MajorType inputType = field.getType();
-        TypeProtos.MajorType outputType;
-        // if join type is LEFT, make sure right batch output fields data mode 
is optional
-        if (popConfig.getJoinType() == JoinRelType.LEFT && inputType.getMode() 
== TypeProtos.DataMode.REQUIRED) {
-          outputType = Types.overrideMode(inputType, 
TypeProtos.DataMode.OPTIONAL);
-        } else {
-          outputType = inputType;
-        }
-
-        MaterializedField newField = MaterializedField.create(field.getName(), 
outputType);
-        container.addOrGet(newField);
-
-        JVar inVV = 
nLJClassGenerator.declareVectorValueSetupAndMember("rightBatch",
-            new TypedFieldId(inputType, false, fieldId));
-        JVar outVV = 
nLJClassGenerator.declareVectorValueSetupAndMember("outgoing",
-            new TypedFieldId(outputType, false, outputFieldId));
-        nLJClassGenerator.getEvalBlock().add(outVV.invoke("copyFromSafe")
-            .arg(rightIndex)
-            .arg(outIndex)
-            .arg(inVV));
-        nLJClassGenerator.rotateBlock();
-        fieldId++;
-        outputFieldId++;
-      }
-    }
-
-    try {
-      return context.getImplementationClass(lateralCG);
-    } catch (IOException | ClassTransformationException ex) {
-      throw new SchemaChangeException("Failed while setting up generated class 
with new schema information", ex);
-    }
-  }
-
-  /**
    * Simple method to allocate space for all the vectors in the container.
    */
   private void allocateVectors() {
@@ -715,8 +576,6 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
    * failure outcome then we don't even call next on right branch, since there 
is no left incoming.
    * @return true if both the left/right batch was received without failure 
outcome.
    *         false if either of batch is received with failure outcome.
-   *
-   * @throws SchemaChangeException
    */
   @Override
   protected boolean prefetchFirstBatchFromBothSides() {
@@ -760,10 +619,6 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
     // Release the vectors received from right side
     VectorAccessibleUtilities.clear(right);
 
-    // We should not allocate memory for all the value vectors inside output 
batch
-    // since this is buildSchema phase and we are sending empty batches 
downstream
-    lateralJoiner = setupWorker();
-
     // Set join index as invalid (-1) if the left side is empty, else set it 
to 0
     leftJoinIndex = (left.getRecordCount() <= 0) ? -1 : 0;
     rightJoinIndex = -1;
@@ -825,7 +680,95 @@ public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP>
   /**
    * Returns the current {@link 
org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left incoming 
batch
    */
+  @Override
   public IterOutcome getLeftOutcome() {
     return leftUpstream;
   }
+
+  /**
+   * Main entry point for producing the output records. This method populates 
the output batch after cross join of
+   * the record in a given left batch at left index and all the corresponding 
right batches produced for
+   * this left index. The right container is copied starting from rightIndex 
until number of records in the container.
+   *
+   * @param leftIndex - row index in left incoming batch
+   * @param rightIndex - row index in right incoming batch
+   * @param outIndex - row index in output batch
+   *
+   * @return - final row index of output batch
+   */
+  private int crossJoinAndOutputRecords(final int leftIndex, final int 
rightIndex, final int outIndex) {
+    final int rightRecordCount = right.getRecordCount();
+    int outBatchIndex = outIndex;
+
+    // If there is no record in right batch just return current index in 
output batch
+    if (rightRecordCount <= 0) {
+      return outBatchIndex;
+    }
+
+    // Check if right batch is empty since we have to handle left join case
+    Preconditions.checkState(rightIndex != -1, "Right batch record count is >0 
but index is -1");
+    // For every record in right side just emit left and right records in 
output container
+    for (int i = rightIndex; i < rightRecordCount; ++i) {
+      emitLeft(leftIndex, outBatchIndex);
+      emitRight(i, outBatchIndex);
+      ++outBatchIndex;
+
+      if (outBatchIndex >= LateralJoinBatch.MAX_BATCH_SIZE) {
+        break;
+      }
+    }
+    return outBatchIndex;
+  }
+
+  /**
+   * Given a record batch, copies data from all it's vectors at fromRowIndex 
to all the vectors in output batch at
+   * toRowIndex. It iterates over all the vectors from startVectorIndex to 
endVectorIndex inside the record batch to
+   * copy the data and copies it inside vectors from startVectorIndex + 
baseVectorIndex to endVectorIndex +
+   * baseVectorIndex.
+   * @param fromRowIndex - row index of all the vectors in batch to copy data 
from
+   * @param toRowIndex - row index of all the vectors in outgoing batch to 
copy data to
+   * @param batch - source record batch holding vectors with data
+   * @param startVectorIndex - start index of vector inside source record batch
+   * @param endVectorIndex - end index of vector inside source record batch
+   * @param baseVectorIndex - base index to be added to startVectorIndex to 
get corresponding vector in outgoing batch
+   */
+  private void copyDataToOutputVectors(int fromRowIndex, int toRowIndex, 
RecordBatch batch,
+                                      int startVectorIndex, int 
endVectorIndex, int baseVectorIndex) {
+    // Get the vectors using field index rather than Materialized field since 
input batch field can be different from
+    // output container field in case of Left Join. As we rebuild the right 
Schema field to be optional for output
+    // container.
+    for (int i = startVectorIndex; i < endVectorIndex; ++i) {
+      // Get input vector
+      final Class<?> inputValueClass = 
batch.getSchema().getColumn(i).getValueClass();
+      final ValueVector inputVector = 
batch.getValueAccessorById(inputValueClass, i).getValueVector();
+
+      // Get output vector
+      final int outputVectorIndex = i + baseVectorIndex;
+      final Class<?> outputValueClass = 
this.getSchema().getColumn(outputVectorIndex).getValueClass();
+      final ValueVector outputVector = 
this.getValueAccessorById(outputValueClass, outputVectorIndex).getValueVector();
+
+      // Copy data from input vector to output vector
+      outputVector.copyEntry(toRowIndex, inputVector, fromRowIndex);
+    }
+  }
+
+  /**
+   * Copies data at leftIndex from each of vector's in left incoming batch to 
outIndex at corresponding vectors in
+   * outgoing record batch
+   * @param leftIndex - index to copy data from left incoming batch vectors
+   * @param outIndex - index to copy data to in outgoing batch vectors
+   */
+  private void emitLeft(int leftIndex, int outIndex) {
+    copyDataToOutputVectors(leftIndex, outIndex, left, 0, 
leftSchema.getFieldCount(), 0);
+  }
+
+  /**
+   * Copies data at rightIndex from each of vector's in right incoming batch 
to outIndex at corresponding vectors in
+   * outgoing record batch
+   * @param rightIndex - index to copy data from right incoming batch vectors
+   * @param outIndex - index to copy data to in outgoing batch vectors
+   */
+  private void emitRight(int rightIndex, int outIndex) {
+    copyDataToOutputVectors(rightIndex, outIndex, right, 0, 
rightSchema.getFieldCount(), leftSchema.getFieldCount());
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/5ed31962/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinTemplate.java
deleted file mode 100644
index d7b4f1d..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinTemplate.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.join;
-
-import com.google.common.base.Preconditions;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.RecordBatch;
-
-import javax.inject.Named;
-
-/*
- * Template class that combined with the runtime generated source implements 
the LateralJoin interface. This
- * class contains the main lateral join logic.
- */
-public abstract class LateralJoinTemplate implements LateralJoin {
-
-  // Current right input batch being processed
-  private RecordBatch right = null;
-
-  // Index in outgoing container where new record will be inserted
-  private int outputIndex = 0;
-
-  // Keep the join type at setup phase
-  private JoinRelType joinType;
-
-  /**
-   * Method initializes necessary state and invokes the doSetup() to set the 
input and output value vector references.
-   *
-   * @param context Fragment context
-   * @param left Current left input batch being processed
-   * @param right Current right input batch being processed
-   * @param outgoing Output batch
-   */
-  public void setupLateralJoin(FragmentContext context,
-                               RecordBatch left, RecordBatch right,
-                               LateralJoinBatch outgoing, JoinRelType 
joinType) {
-    this.right = right;
-    this.joinType = joinType;
-    doSetup(context, this.right, left, outgoing);
-  }
-
-  /**
-   * Main entry point for producing the output records. This method populates 
the output batch after cross join of
-   * the record in a given left batch at left index and all the corresponding 
right batches produced for
-   * this left index. The right container is copied starting from rightIndex 
until number of records in the container.
-   *
-   * @return the number of records produced in the output batch
-   */
-  public int crossJoinAndOutputRecords(int leftIndex, int rightIndex) {
-
-    final int rightRecordCount = right.getRecordCount();
-    int currentOutputIndex = outputIndex;
-
-    // If there is no record in right batch just return current index in 
output batch
-    if (rightRecordCount <= 0) {
-      return currentOutputIndex;
-    }
-
-    // Check if right batch is empty since we have to handle left join case
-    Preconditions.checkState(rightIndex != -1, "Right batch record count is >0 
but index is -1");
-    // For every record in right side just emit left and right records in 
output container
-    for (; rightIndex < rightRecordCount; ++rightIndex) {
-      emitLeft(leftIndex, currentOutputIndex);
-      emitRight(rightIndex, currentOutputIndex);
-      ++currentOutputIndex;
-
-      if (currentOutputIndex >= LateralJoinBatch.MAX_BATCH_SIZE) {
-        break;
-      }
-    }
-
-    updateOutputIndex(currentOutputIndex);
-    return currentOutputIndex;
-  }
-
-  /**
-   * If current output batch is full then reset the output index for next 
output batch
-   * Otherwise it means we still have space left in output batch, so next call 
will continue populating from
-   * newOutputIndex
-   * @param newOutputIndex - new output index of outgoing batch after copying 
the records
-   */
-  public void updateOutputIndex(int newOutputIndex) {
-    outputIndex = (newOutputIndex >= LateralJoinBatch.MAX_BATCH_SIZE) ?
-      0 : newOutputIndex;
-  }
-
-  /**
-   * Method to copy just the left batch record at given leftIndex, the right 
side records will be NULL. This is
-   * used in case when Join Type is LEFT and we have only seen empty batches 
from right side
-   * @param leftIndex - index in left batch to copy record from
-   */
-  public int generateLeftJoinOutput(int leftIndex) {
-    int currentOutputIndex = outputIndex;
-
-    if (JoinRelType.LEFT == joinType) {
-      emitLeft(leftIndex, currentOutputIndex++);
-      updateOutputIndex(currentOutputIndex);
-    }
-    return currentOutputIndex;
-  }
-
-  /**
-   * Generated method to setup vector references in rightBatch, leftBatch and 
outgoing batch. It should be called
-   * after initial schema build phase, when the schema for outgoing container 
is known. This method should also be
-   * called after each New Schema discovery during execution.
-   * @param context
-   * @param rightBatch - right incoming batch
-   * @param leftBatch - left incoming batch
-   * @param outgoing - output batch
-   */
-  public abstract void doSetup(@Named("context") FragmentContext context,
-                               @Named("rightBatch") RecordBatch rightBatch,
-                               @Named("leftBatch") RecordBatch leftBatch,
-                               @Named("outgoing") RecordBatch outgoing);
-
-  /**
-   * Generated method to copy the record from right batch at rightIndex to 
outgoing batch at outIndex
-   * @param rightIndex - index to copy record from the right batch
-   * @param outIndex - index to copy record to a outgoing batch
-   */
-  public abstract void emitRight(@Named("rightIndex") int rightIndex,
-                                 @Named("outIndex") int outIndex);
-
-  /**
-   * Generated method to copy the record from left batch at leftIndex to 
outgoing batch at outIndex
-   * @param leftIndex - index to copy record from the left batch
-   * @param outIndex - index to copy record to a outgoing batch
-   */
-  public abstract void emitLeft(@Named("leftIndex") int leftIndex,
-                                @Named("outIndex") int outIndex);
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/5ed31962/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index 8e4d1d3..b237ef7 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -1504,6 +1504,73 @@ public class TestLateralJoinCorrectness extends 
SubOperatorTest {
   }
 
   /**
+   * Test to see if there are multiple rows in left batch and for some rows 
right side produces multiple batches such
+   * that some are with records and some are empty then we are not duplicating 
those left side records based on left
+   * join type. In this case all the output will be produces only in 1 record 
batch.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testLeftLateralJoin_WithMatchingAndEmptyBatch() throws Exception 
{
+    // Get the left container with dummy data for Lateral Join
+    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema)
+      .addRow(1, 10, "item10")
+      .addRow(2, 20, "item20")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyRightRowSet2 = 
fixture.rowSetBuilder(rightSchema)
+      .addRow(6, 60, "item61")
+      .addRow(7, 70, "item71")
+      .addRow(8, 80, "item81")
+      .build();
+
+    leftContainer.add(leftRowSet2.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new 
MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet2.container());
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(emptyRightRowSet.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    rightOutcomes.add(RecordBatch.IterOutcome.OK);
+    rightOutcomes.add(RecordBatch.IterOutcome.OK);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new 
MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, 
JoinRelType.LEFT);
+
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, 
fixture.getFragmentContext(),
+      leftMockBatch, rightMockBatch);
+
+    try {
+      final int expectedOutputRecordCount = 6; // 3 for first left row and 1 
for second left row
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      assertTrue(ljBatch.getRecordCount() == expectedOutputRecordCount);
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } catch (AssertionError | Exception error) {
+      fail();
+    } finally {
+      // Close all the resources for this test case
+      ljBatch.close();
+      leftMockBatch.close();
+      rightMockBatch.close();
+    }
+  }
+
+  /**
    * Test to see if there are multiple rows in left batch and for some rows 
right side produces batch with records
    * and for other rows right side produces empty batches then based on left 
join type we are populating the output
    * batch correctly. Expectation is that for left rows if we find 
corresponding right rows then we will output both

Reply via email to