DRILL-6323: Lateral Join - Initial implementation

Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5a63e274
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5a63e274
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5a63e274

Branch: refs/heads/master
Commit: 5a63e2748d71134c0258147972b45a0fb25f5461
Parents: 8051c24
Author: Sorabh Hamirwasia <shamirwa...@maprtech.com>
Authored: Mon Feb 5 14:46:19 2018 -0800
Committer: Parth Chandra <par...@apache.org>
Committed: Tue Apr 17 18:15:44 2018 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |   5 +
 .../physical/base/AbstractPhysicalVisitor.java  |   3 -
 .../exec/physical/base/PhysicalVisitor.java     |   3 -
 .../exec/physical/config/LateralJoinPOP.java    |  55 ++
 .../exec/physical/impl/join/LateralJoin.java    |  55 ++
 .../physical/impl/join/LateralJoinBatch.java    | 741 +++++++++++++++++++
 .../impl/join/LateralJoinBatchCreator.java      |  34 +
 .../physical/impl/join/LateralJoinTemplate.java | 146 ++++
 .../physical/impl/sort/RecordBatchData.java     |   1 +
 .../exec/record/AbstractBinaryRecordBatch.java  |  11 +
 .../drill/exec/record/AbstractRecordBatch.java  |   1 +
 11 files changed, 1049 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/5a63e274/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 77fa211..5a89081 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -186,6 +186,11 @@ public final class ExecConstants {
   public static final String BIT_ENCRYPTION_SASL_ENABLED = 
"drill.exec.security.bit.encryption.sasl.enabled";
   public static final String BIT_ENCRYPTION_SASL_MAX_WRAPPED_SIZE = 
"drill.exec.security.bit.encryption.sasl.max_wrapped_size";
 
+  /**
+   * Setting to control enabling/disabling writing of generated code dump for 
Lateral
+   */
+  public static final String ENABLE_CODE_DUMP_DEBUG_LATERAL = 
"drill.exec.compile.codegen.debug.lateral";
+
   /** Size of JDBC batch queue (in batches) above which throttling begins. */
   public static final String JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD =
       "drill.jdbc.batch_queue_throttling_threshold";

http://git-wip-us.apache.org/repos/asf/drill/blob/5a63e274/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index 3a5d22e..66affeb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -21,13 +21,10 @@ import 
org.apache.drill.exec.physical.config.BroadcastSender;
 import org.apache.drill.exec.physical.config.Filter;
 import org.apache.drill.exec.physical.config.FlattenPOP;
 import org.apache.drill.exec.physical.config.HashAggregate;
-import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.config.IteratorValidator;
 import org.apache.drill.exec.physical.config.Limit;
-import org.apache.drill.exec.physical.config.MergeJoinPOP;
 import org.apache.drill.exec.physical.config.MergingReceiverPOP;
-import org.apache.drill.exec.physical.config.NestedLoopJoinPOP;
 import org.apache.drill.exec.physical.config.OrderedPartitionSender;
 import org.apache.drill.exec.physical.config.ProducerConsumer;
 import org.apache.drill.exec.physical.config.Project;

http://git-wip-us.apache.org/repos/asf/drill/blob/5a63e274/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index db9c873..8480a07 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -21,13 +21,10 @@ import 
org.apache.drill.exec.physical.config.BroadcastSender;
 import org.apache.drill.exec.physical.config.Filter;
 import org.apache.drill.exec.physical.config.FlattenPOP;
 import org.apache.drill.exec.physical.config.HashAggregate;
-import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.config.IteratorValidator;
 import org.apache.drill.exec.physical.config.Limit;
-import org.apache.drill.exec.physical.config.MergeJoinPOP;
 import org.apache.drill.exec.physical.config.MergingReceiverPOP;
-import org.apache.drill.exec.physical.config.NestedLoopJoinPOP;
 import org.apache.drill.exec.physical.config.OrderedPartitionSender;
 import org.apache.drill.exec.physical.config.ProducerConsumer;
 import org.apache.drill.exec.physical.config.Project;

http://git-wip-us.apache.org/repos/asf/drill/blob/5a63e274/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
new file mode 100644
index 0000000..946b4a6
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.exec.physical.base.AbstractJoinPop;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+
+import java.util.List;
+
+@JsonTypeName("lateral-join")
+public class LateralJoinPOP extends AbstractJoinPop {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LateralJoinPOP.class);
+
+  @JsonCreator
+  public LateralJoinPOP(
+      @JsonProperty("left") PhysicalOperator left,
+      @JsonProperty("right") PhysicalOperator right,
+      @JsonProperty("joinType") JoinRelType joinType) {
+    super(left, right, joinType, null, null);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.size() == 2,
+      "Lateral join should have two physical operators");
+    return new LateralJoinPOP(children.get(0), children.get(1), joinType);
+  }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.LATERAL_JOIN_VALUE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5a63e274/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoin.java
new file mode 100644
index 0000000..1d946ce
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoin.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.LateralJoinPOP;
+import org.apache.drill.exec.record.ExpandableHyperContainer;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+
+import java.util.LinkedList;
+
+/**
+ * Interface for the lateral join operator.
+ */
+public interface LateralJoin {
+  public static TemplateClassDefinition<LateralJoin> TEMPLATE_DEFINITION =
+    new TemplateClassDefinition<>(LateralJoin.class, 
LateralJoinTemplate.class);
+
+  public void setupLateralJoin(FragmentContext context, RecordBatch left,
+                               RecordBatch right, LateralJoinBatch outgoing,
+                               JoinRelType joinType);
+
+  // Produce output records taking into account join type
+  public int crossJoinAndOutputRecords(int leftIndex, int rightIndex);
+
+  public void generateLeftJoinOutput(int leftIndex);
+
+  // Project the record at offset 'leftIndex' in the left input batch into the 
output container at offset 'outIndex'
+  public void emitLeft(int leftIndex, int outIndex);
+
+  // Project the record from the hyper container given the batch index and the 
record within the batch at 'outIndex'
+  public void emitRight(int rightIndex, int outIndex);
+
+  // Setup the input/output value vector references
+  public void doSetup(FragmentContext context, RecordBatch rightBatch,
+                      RecordBatch leftBatch, RecordBatch outgoing);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5a63e274/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
new file mode 100644
index 0000000..45b5059
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -0,0 +1,741 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.base.Preconditions;
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JVar;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.compile.sig.GeneratorMapping;
+import org.apache.drill.exec.compile.sig.MappingSet;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.config.LateralJoinPOP;
+import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.AllocationHelper;
+
+import java.io.IOException;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OUT_OF_MEMORY;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
+
+/*
+ * RecordBatch implementation for the lateral join operator
+ * TODO: Create another class called BatchState for both left and right 
batches and store
+ * TODO: Schema, index and other flags in it.
+ */
+public class LateralJoinBatch extends 
AbstractBinaryRecordBatch<LateralJoinPOP> implements LateralContract {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LateralJoinBatch.class);
+
+  // Maximum number records in the outgoing batch
+  protected static final int MAX_BATCH_SIZE = 4096;
+
+  // Input indexes to correctly update the stats
+  protected static final int LEFT_INPUT = 0;
+  protected static final int RIGHT_INPUT = 1;
+
+  // Schema on the left side
+  private BatchSchema leftSchema = null;
+
+  // Schema on the right side
+  private BatchSchema rightSchema = null;
+
+  // Runtime generated class implementing the LateralJoin interface
+  private LateralJoin lateralJoiner = null;
+
+  // Number of output records in the current outgoing batch
+  private int outputRecords = 0;
+
+  // Current index of record in left incoming which is being processed
+  private int leftJoinIndex = -1;
+
+  // Current index of record in right incoming which is being processed
+  private int rightJoinIndex = -1;
+
+  // flag to keep track if current left batch needs to be processed in future 
next call
+  private boolean processLeftBatchInFuture = false;
+
+  // Keep track if any matching right record was found for current left index 
record
+  private boolean matchedRecordFound = false;
+
+  private boolean enableLateralCGDebugging = true;
+
+  // Shared Generator mapping for the left/right side : constant
+  private static final GeneratorMapping EMIT_CONSTANT =
+    GeneratorMapping.create("doSetup"/* setup method */,"doSetup" /* eval 
method */,
+      null /* reset */, null /* cleanup */);
+
+  // Generator mapping for the right side
+  private static final GeneratorMapping EMIT_RIGHT =
+    GeneratorMapping.create("doSetup"/* setup method */,"emitRight" /* eval 
method */,
+      null /* reset */,null /* cleanup */);
+
+  // Generator mapping for the left side : scalar
+  private static final GeneratorMapping EMIT_LEFT =
+    GeneratorMapping.create("doSetup" /* setup method */, "emitLeft" /* eval 
method */,
+      null /* reset */, null /* cleanup */);
+
+  // Mapping set for the right side
+  private static final MappingSet emitRightMapping =
+    new MappingSet("rightIndex" /* read index */, "outIndex" /* write index */,
+      "rightBatch" /* read container */,"outgoing" /* write container */,
+      EMIT_CONSTANT, EMIT_RIGHT);
+
+  // Mapping set for the left side
+  private static final MappingSet emitLeftMapping =
+    new MappingSet("leftIndex" /* read index */, "outIndex" /* write index */,
+      "leftBatch" /* read container */,"outgoing" /* write container */,
+      EMIT_CONSTANT, EMIT_LEFT);
+
+  protected LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext context,
+                             RecordBatch left, RecordBatch right) throws 
OutOfMemoryException {
+    super(popConfig, context, left, right);
+    Preconditions.checkNotNull(left);
+    Preconditions.checkNotNull(right);
+    enableLateralCGDebugging = 
true;//context.getConfig().getBoolean(ExecConstants.ENABLE_CODE_DUMP_DEBUG_LATERAL);
+  }
+
+  private boolean handleSchemaChange() {
+    try {
+      stats.startSetup();
+      setupNewSchema();
+      lateralJoiner = setupWorker();
+      lateralJoiner.setupLateralJoin(context, left, right, this, 
popConfig.getJoinType());
+      return true;
+    } catch (SchemaChangeException ex) {
+      logger.error("Failed to handle schema change hence killing the query");
+      context.getExecutorState().fail(ex);
+      kill(false);
+      return false;
+    } finally {
+      stats.stopSetup();
+    }
+  }
+
+  private boolean isTerminalOutcome(IterOutcome outcome) {
+    return (outcome == STOP || outcome == OUT_OF_MEMORY || outcome == NONE);
+  }
+
+  /**
+   * Process left incoming batch with different {@link 
org.apache.drill.exec.record.RecordBatch.IterOutcome}. It is
+   * called from main {@link LateralJoinBatch#innerNext()} block with each 
next() call from upstream operator. Also
+   * when we populate the outgoing container then this method is called to get 
next left batch if current one is
+   * fully processed. It calls next() on left side until we get a non-empty 
RecordBatch. OR we get either of
+   * OK_NEW_SCHEMA/EMIT/NONE/STOP/OOM/NOT_YET outcome.
+   * @param leftBatch - reference to left incoming record batch. Not needed 
but provided to make it easy for testing.
+   * @return IterOutcome after processing current left batch
+   */
+  private IterOutcome processLeftBatch(RecordBatch leftBatch) {
+
+    boolean needLeftBatch = leftJoinIndex == -1;
+
+    // If left batch is empty
+    while (needLeftBatch) {
+      leftUpstream = !processLeftBatchInFuture ? next(LEFT_INPUT, leftBatch) : 
leftUpstream;
+      final boolean emptyLeftBatch = leftBatch.getRecordCount() <=0;
+
+      switch (leftUpstream) {
+        case OK_NEW_SCHEMA:
+          // This means there is already some records from previous join 
inside left batch
+          // So we need to pass that downstream and then handle the 
OK_NEW_SCHEMA in subsequent next call
+          if (outputRecords > 0) {
+            processLeftBatchInFuture = true;
+            return OK_NEW_SCHEMA;
+          }
+
+          // This OK_NEW_SCHEMA is received post build schema phase and from 
left side
+          if (!isSchemaChanged(left.getSchema(), leftSchema)) {
+            logger.warn("New schema received from left side is same as 
previous known left schema. Ignoring this " +
+              "schema change");
+
+            // Current left batch is empty and schema didn't changed as well, 
so let's get next batch and loose
+            // OK_NEW_SCHEMA outcome
+            if (emptyLeftBatch) {
+              processLeftBatchInFuture = false;
+              continue;
+            }
+          }
+
+          // If left batch is empty with actual schema change then just 
rebuild the output container and send empty
+          // batch downstream
+          if (emptyLeftBatch) {
+            if (handleSchemaChange()) {
+              container.setRecordCount(0);
+              leftJoinIndex = -1;
+              return OK_NEW_SCHEMA;
+            } else {
+              return STOP;
+            }
+          } // else - setup the new schema information after getting it from 
right side too.
+        case OK:
+          // With OK outcome we will keep calling next until we get a batch 
with >0 records
+          if (emptyLeftBatch) {
+            leftJoinIndex = -1;
+            continue;
+          } else {
+            leftJoinIndex = 0;
+          }
+          break;
+        case EMIT:
+          // don't call next on right batch
+          if (emptyLeftBatch) {
+            leftJoinIndex = -1;
+            container.setRecordCount(0);
+            return EMIT;
+          } else {
+            leftJoinIndex = 0;
+          }
+          break;
+        case OUT_OF_MEMORY:
+        case NONE:
+        case STOP:
+          // Not using =0 since if outgoing container is empty then no point 
returning anything
+          if (outputRecords > 0) {
+            processLeftBatchInFuture = true;
+          }
+          //TODO we got a STOP, shouldn't we stop immediately ?
+          // TODO: check what killAndDrain will do w.r.t UNNEST, we discussed 
about calling right side
+          // of LATERAL with NONE outcome or calling stop explicitly when NONE 
is seen on left side
+          killAndDrainIncoming(right, rightUpstream, RIGHT_INPUT);
+          return leftUpstream;
+        case NOT_YET:
+          try {
+            Thread.sleep(5);
+          } catch (InterruptedException ex) {
+            logger.debug("Thread interrupted while sleeping to call next on 
left branch of LATERAL since it " +
+              "received NOT_YET");
+          }
+          break;
+      }
+
+      needLeftBatch = leftJoinIndex == -1;
+    }
+
+    return leftUpstream;
+  }
+
+  private IterOutcome processRightBatch(RecordBatch right) {
+    // Check if we still have records left to process in left incoming from 
new batch or previously half processed
+    // batch. We are making sure to update leftJoinIndex and rightJoinIndex 
correctly. Like for new
+    // batch leftJoinIndex will always be set to zero and once leftSide batch 
is fully processed then
+    // it will be set to -1.
+    // Whereas rightJoinIndex is to keep track of record in right batch being 
joined with
+    // record in left batch. So when there are cases such that all records in 
right batch is not consumed
+    // by the output, then rightJoinIndex will be a valid index. When all 
records are consumed it will be set to -1.
+    boolean needNewRightBatch = (leftJoinIndex >= 0) && (rightJoinIndex == -1);
+    while (needNewRightBatch) {
+      rightUpstream = next(RIGHT_INPUT, right);
+      switch (rightUpstream) {
+        case OK_NEW_SCHEMA:
+          // We should not get OK_NEW_SCHEMA multiple times for the same left 
incoming batch. So there won't be a
+          // case where we get OK_NEW_SCHEMA --> OK (with batch) ---> 
OK_NEW_SCHEMA --> OK/EMIT
+          // fall through
+          //
+          // Right batch with OK_NEW_SCHEMA is always going to be an empty 
batch, so let's pass the new schema
+          // downstream and later with subsequent next() call the join output 
will be produced
+          Preconditions.checkState(right.getRecordCount() == 0,
+            "Right side batch with OK_NEW_SCHEMA is not empty");
+
+          if (!isSchemaChanged(right.getSchema(), rightSchema)) {
+            logger.warn("New schema received from right side is same as 
previous known right schema. Ignoring this " + "schema change");
+            continue;
+          }
+          if (handleSchemaChange()) {
+            container.setRecordCount(0);
+            rightJoinIndex = -1;
+            return OK_NEW_SCHEMA;
+          } else {
+            return STOP;
+          }
+        case OK:
+        case EMIT:
+          // Even if there are no records we should not call next() again 
because in case of LEFT join
+          // empty batch is of importance too
+          rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1;
+          needNewRightBatch = false;
+          break;
+        case OUT_OF_MEMORY:
+        case NONE:
+        case STOP:
+          //TODO we got a STOP, shouldn't we stop immediately ?
+          // TODO: Should we kill left side if right side fails ?
+          killAndDrainIncoming(left, leftUpstream, LEFT_INPUT);
+          VectorAccessibleUtilities.clear(container);
+          needNewRightBatch = false;
+          break;
+        case NOT_YET:
+          try {
+            Thread.sleep(10);
+          } catch (InterruptedException ex) {
+            logger.debug("Thread interrupted while sleeping to call next on 
left branch of LATERAL since it " +
+              "received NOT_YET");
+          }
+          break;
+      }
+    }
+
+    return rightUpstream;
+  }
+
+  /**
+   * Method that get's left and right incoming batch and produce the output 
batch. If the left incoming batch is
+   * empty then next on right branch is not called and empty batch with 
correct outcome is returned. If non empty
+   * left incoming batch is received then it call's next on right branch to 
get an incoming and finally produces
+   * output.
+   * @return IterOutcome state of the lateral join batch
+   */
+  @Override
+  public IterOutcome innerNext() {
+
+    // We don't do anything special on FIRST state. Process left batch first 
and then right batch if need be
+    IterOutcome childOutcome = processLeftBatch(left);
+
+    // reset this state after calling processLeftBatch above.
+    processLeftBatchInFuture = false;
+
+    // If the left batch doesn't have any record in the incoming batch or the 
state returned from
+    // left side is terminal state then just return the IterOutcome and don't 
call next() on
+    // right branch
+    if (left.getRecordCount() == 0 || isTerminalOutcome(childOutcome)) {
+      return childOutcome;
+    }
+
+    // Left side has some records in the batch so let's process right batch
+    childOutcome = processRightBatch(right);
+
+    // reset the left & right outcomes to OK here and send the empty batch 
downstream
+    // Assumption being right side will always send OK_NEW_SCHEMA with empty 
batch which is what UNNEST will do
+    if (childOutcome == OK_NEW_SCHEMA) {
+      leftUpstream = OK;
+      rightUpstream = OK;
+      return childOutcome;
+    }
+
+    if (isTerminalOutcome(childOutcome)) {
+      return childOutcome;
+    }
+
+    // If OK_NEW_SCHEMA is seen only on non empty left batch but not on right 
batch, then we should setup schema in
+    // output container based on new left schema and old right schema. If 
schema change failed then return STOP
+    // downstream
+    if (leftUpstream == OK_NEW_SCHEMA && !handleSchemaChange()) {
+        return STOP;
+    }
+
+    // Setup the references of left, right and outgoing container in generated 
operator
+    if (state == BatchState.FIRST) {
+      lateralJoiner.setupLateralJoin(context, left, right, this, 
popConfig.getJoinType());
+      state = BatchState.NOT_FIRST;
+    }
+
+    // allocate space for the outgoing batch
+    allocateVectors();
+
+    return produceOutputBatch();
+  }
+
+  private IterOutcome produceOutputBatch() {
+
+    // Try to fully pack the outgoing container
+    while (outputRecords < LateralJoinBatch.MAX_BATCH_SIZE) {
+      int previousOutputCount = outputRecords;
+      // invoke the runtime generated method to emit records in the output 
batch for each leftJoinIndex
+      outputRecords = lateralJoiner.crossJoinAndOutputRecords(leftJoinIndex, 
rightJoinIndex);
+
+      // We have produced some records in outgoing container, hence there must 
be a match found for left record
+      if (outputRecords > previousOutputCount) {
+        matchedRecordFound = true;
+      }
+
+      if (right.getRecordCount() == 0) {
+        rightJoinIndex = -1;
+      } else {
+        // One right batch might span across multiple output batch. So 
rightIndex will be moving sum of all the
+        // output records for this record batch until it's fully consumed
+        rightJoinIndex += outputRecords;
+      }
+
+      final boolean isRightProcessed = rightJoinIndex == -1 || rightJoinIndex 
>= right.getRecordCount();
+
+      // Check if above join to produce output was based on empty right batch 
or
+      // it resulted in right side batch to be fully consumed. In this 
scenario only if rightUpstream
+      // is EMIT then increase the leftJoinIndex.
+      // Otherwise it means for the given right batch there is still some 
record left to be processed.
+      if (isRightProcessed) {
+        if (rightUpstream == EMIT) {
+          if (!matchedRecordFound) {
+            // will only produce left side in case of LEFT join
+            lateralJoiner.generateLeftJoinOutput(leftJoinIndex);
+          }
+          ++leftJoinIndex;
+          // Reset matchedRecord for next left index record
+          matchedRecordFound = false;
+        }
+
+        // Release vectors of right batch. This will happen for both 
rightUpstream = EMIT/OK
+        VectorAccessibleUtilities.clear(right);
+        rightJoinIndex = -1;
+      }
+
+      // Check if previous left record was last one, then set leftJoinIndex to 
-1
+      final boolean isLeftProcessed = leftJoinIndex >= left.getRecordCount();
+      if (isLeftProcessed) {
+        leftJoinIndex = -1;
+        VectorAccessibleUtilities.clear(left);
+      }
+
+      // Check if output batch still has some space
+      if (outputRecords < MAX_BATCH_SIZE) {
+        // Check if left side still has records or not
+        if (isLeftProcessed) {
+          // The left batch was with EMIT/OK_NEW_SCHEMA outcome, then return 
output to downstream layer
+          if (leftUpstream == EMIT || leftUpstream == OK_NEW_SCHEMA) {
+            break;
+          } else {
+            // Get both left batch and the right batch and make sure indexes 
are properly set
+            leftUpstream = processLeftBatch(left);
+
+            if (processLeftBatchInFuture) {
+              // We should return the current output batch with OK outcome and 
don't reset the leftUpstream
+              finalizeOutputContainer();
+              return OK;
+            }
+          }
+        }
+
+        // If we are here it means one of the below:
+        // 1) Either previous left batch was not fully processed and it came 
with OK outcome. There is still some space
+        // left in outgoing batch so let's get next right batch.
+        // 2) OR previous left & right batch was fully processed and it came 
with OK outcome. There is space in outgoing
+        // batch. Now we have got new left batch with OK outcome. Let's get 
next right batch
+        //
+        // It will not hit OK_NEW_SCHEMA since left side have not seen that 
outcome
+
+        rightUpstream = processRightBatch(right);
+
+        Preconditions.checkState(rightUpstream != OK_NEW_SCHEMA, "Unexpected 
schema change in right branch");
+
+        if (isTerminalOutcome(rightUpstream)) {
+          return rightUpstream;
+        }
+      }
+    } // output batch is full to its max capacity
+
+    finalizeOutputContainer();
+
+    if (leftUpstream == EMIT) {
+      return EMIT;
+    }
+
+    if (leftUpstream == OK_NEW_SCHEMA) {
+      // return output batch with OK_NEW_SCHEMA and reset the state to OK
+      leftUpstream = OK;
+      return OK_NEW_SCHEMA;
+    }
+
+    return OK;
+  }
+
+  /**
+   * Finalizes the current output container with the records produced so far 
before sending it downstream
+   */
+  private void finalizeOutputContainer() {
+
+    VectorAccessibleUtilities.setValueCount(container, outputRecords);
+
+    // Set the record count in the container
+    container.setRecordCount(outputRecords);
+    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
+    logger.debug("Number of records emitted: " + outputRecords);
+
+    // We are about to send the output batch so reset the outputRecords for 
future next call
+    outputRecords = 0;
+  }
+
+  private void killAndDrainIncoming(RecordBatch batch, IterOutcome outcome,
+                                    int batchIndex) {
+    if (!hasMore(outcome)) {
+      return;
+    }
+
+    batch.kill(true);
+    while (hasMore(outcome)) {
+      for (final VectorWrapper<?> wrapper : batch) {
+        wrapper.getValueVector().clear();
+      }
+      outcome = next(batchIndex, batch);
+    }
+    if (batchIndex == RIGHT_INPUT) {
+      rightUpstream = outcome;
+    } else {
+      leftUpstream = outcome;
+    }
+  }
+
+  private boolean hasMore(IterOutcome outcome) {
+    return outcome == OK || outcome == OK_NEW_SCHEMA || outcome == EMIT;
+  }
+
+  private boolean isSchemaChanged(BatchSchema newSchema, BatchSchema 
oldSchema) {
+    return newSchema.isEquivalent(oldSchema);
+  }
+
+  /**
+   * Helps to create the outgoing container vectors based on known left and 
right batch schemas
+   * @throws SchemaChangeException
+   */
+  private void setupNewSchema() throws SchemaChangeException {
+
+    // Clear up the container
+    container.clear();
+    leftSchema = left.getSchema();
+    rightSchema = right.getSchema();
+
+    if (leftSchema == null || rightSchema == null) {
+      throw new SchemaChangeException("Either of left or right schema was not 
set properly in the batches. Hence " +
+        "failed to setupNewSchema");
+    }
+
+    // Setup LeftSchema in outgoing container
+    for (final VectorWrapper<?> vectorWrapper : left) {
+      container.addOrGet(vectorWrapper.getField());
+    }
+
+    // Setup RightSchema in the outgoing container
+    for (final VectorWrapper<?> vectorWrapper : right) {
+      MaterializedField rightField = vectorWrapper.getField();
+      TypeProtos.MajorType rightFieldType = vectorWrapper.getField().getType();
+
+      // make right input schema optional if we have LEFT join
+      if (popConfig.getJoinType() == JoinRelType.LEFT &&
+        rightFieldType.getMode() == TypeProtos.DataMode.REQUIRED) {
+        final TypeProtos.MajorType outputType =
+          Types.overrideMode(rightField.getType(), 
TypeProtos.DataMode.OPTIONAL);
+
+        // Create the right field with optional type. This will also take care 
of creating
+        // children fields in case of ValueVectors of map type
+        rightField = rightField.withType(outputType);
+      }
+      container.addOrGet(rightField);
+    }
+
+    // Let's build schema for the container
+    container.setRecordCount(0);
+    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+  }
+
+  /**
+   * Method generates the runtime code needed for LateralJoin. Other than the 
setup method to set the input and output
+   * value vector references we implement two more methods
+   * 1. emitLeft() -> Copy record from the left side to output container
+   * 2. emitRight() -> Copy record from the right side to output container
+   * @return the runtime generated class that implements the LateralJoin 
interface
+   */
+  private LateralJoin setupWorker() throws SchemaChangeException {
+    final CodeGenerator<LateralJoin> lateralCG = CodeGenerator.get(
+      LateralJoin.TEMPLATE_DEFINITION, context.getOptions());
+    lateralCG.plainJavaCapable(true);
+
+    // To enabled code gen dump for lateral use the setting 
ExecConstants.ENABLE_CODE_DUMP_DEBUG_LATERAL
+    lateralCG.saveCodeForDebugging(enableLateralCGDebugging);
+    final ClassGenerator<LateralJoin> nLJClassGenerator = lateralCG.getRoot();
+
+    // generate emitLeft
+    nLJClassGenerator.setMappingSet(emitLeftMapping);
+    JExpression outIndex = JExpr.direct("outIndex");
+    JExpression leftIndex = JExpr.direct("leftIndex");
+
+    int fieldId = 0;
+    int outputFieldId = 0;
+    if (leftSchema != null) {
+      // Set the input and output value vector references corresponding to the 
left batch
+      for (MaterializedField field : leftSchema) {
+        final TypeProtos.MajorType fieldType = field.getType();
+
+        // Add the vector to the output container
+        container.addOrGet(field);
+
+        JVar inVV = 
nLJClassGenerator.declareVectorValueSetupAndMember("leftBatch",
+            new TypedFieldId(fieldType, false, fieldId));
+        JVar outVV = 
nLJClassGenerator.declareVectorValueSetupAndMember("outgoing",
+            new TypedFieldId(fieldType, false, outputFieldId));
+
+        
nLJClassGenerator.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(leftIndex).arg(outIndex).arg(inVV));
+        nLJClassGenerator.rotateBlock();
+        fieldId++;
+        outputFieldId++;
+      }
+    }
+
+    // generate emitRight
+    fieldId = 0;
+    nLJClassGenerator.setMappingSet(emitRightMapping);
+    JExpression rightIndex = JExpr.direct("rightIndex");
+
+    if (rightSchema != null) {
+      // Set the input and output value vector references corresponding to the 
right batch
+      for (MaterializedField field : rightSchema) {
+
+        final TypeProtos.MajorType inputType = field.getType();
+        TypeProtos.MajorType outputType;
+        // if join type is LEFT, make sure right batch output fields data mode 
is optional
+        if (popConfig.getJoinType() == JoinRelType.LEFT && inputType.getMode() 
== TypeProtos.DataMode.REQUIRED) {
+          outputType = Types.overrideMode(inputType, 
TypeProtos.DataMode.OPTIONAL);
+        } else {
+          outputType = inputType;
+        }
+
+        MaterializedField newField = MaterializedField.create(field.getName(), 
outputType);
+        container.addOrGet(newField);
+
+        JVar inVV = 
nLJClassGenerator.declareVectorValueSetupAndMember("rightBatch",
+            new TypedFieldId(inputType, false, fieldId));
+        JVar outVV = 
nLJClassGenerator.declareVectorValueSetupAndMember("outgoing",
+            new TypedFieldId(outputType, false, outputFieldId));
+        nLJClassGenerator.getEvalBlock().add(outVV.invoke("copyFromSafe")
+            .arg(rightIndex)
+            .arg(outIndex)
+            .arg(inVV));
+        nLJClassGenerator.rotateBlock();
+        fieldId++;
+        outputFieldId++;
+      }
+    }
+
+    try {
+      return context.getImplementationClass(lateralCG);
+    } catch (IOException | ClassTransformationException ex) {
+      throw new SchemaChangeException("Failed while setting up generated class 
with new schema information", ex);
+    }
+  }
+
+  /**
+   * Simple method to allocate space for all the vectors in the container.
+   */
+  private void allocateVectors() {
+    for (final VectorWrapper<?> vw : container) {
+      AllocationHelper.allocateNew(vw.getValueVector(), MAX_BATCH_SIZE);
+    }
+  }
+
+  /**
+   * Prefetch a batch from left and right branch to know about the schema of 
each side. Then adds value vector in
+   * output container based on those schemas. For this phase LATERAL always 
expect's an empty batch from right side
+   * which UNNEST should abide by.
+   *
+   * @throws SchemaChangeException if batch schema was changed during execution
+   */
+  @Override
+  protected void buildSchema() throws SchemaChangeException {
+    // Prefetch a RecordBatch from both left and right branch
+    if (!prefetchFirstBatchFromBothSides()) {
+      return;
+    }
+
+    Preconditions.checkState(right.getRecordCount() == 0, "Unexpected 
non-empty first right batch received");
+
+    // Setup output container schema based on known left and right schema
+    setupNewSchema();
+
+    // Release the vectors received from right side
+    VectorAccessibleUtilities.clear(right);
+
+
+    // We should not allocate memory for all the value vectors inside output 
batch
+    // since this is buildSchema phase and we are sending empty batches 
downstream
+    lateralJoiner = setupWorker();
+
+    // Set join index as invalid (-1) if the left side is empty, else set it 
to 0
+    leftJoinIndex = (left.getRecordCount() <= 0) ? -1 : 0;
+    rightJoinIndex = -1;
+
+    // Reset the left side of the IterOutcome since for this call, 
OK_NEW_SCHEMA will be returned correctly
+    // by buildSchema caller and we should treat the batch as received with OK 
outcome.
+    leftUpstream = OK;
+    rightUpstream = OK;
+  }
+
+  @Override
+  public void close() {
+    super.close();
+  }
+
+  @Override
+  protected void killIncoming(boolean sendUpstream) {
+    this.left.kill(sendUpstream);
+    this.right.kill(sendUpstream);
+  }
+
+  @Override
+  public int getRecordCount() {
+    return container.getRecordCount();
+  }
+
+  /**
+   * Returns the left side incoming for the Lateral Join. Used by right branch 
leaf operator of Lateral
+   * to process the records at leftJoinIndex.
+   *
+   * @return - RecordBatch received as left side incoming
+   */
+  @Override
+  public RecordBatch getIncoming() {
+    Preconditions.checkState (left != null, "Retuning null left batch. It's 
unexpected since right side will only be " +
+      "called iff there is any valid left batch");
+    return left;
+  }
+
+  /**
+   * Returns the current row index which the calling operator should process 
in current incoming left record batch.
+   * LATERAL should never return it as -1 since that indicated current left 
batch is empty and LATERAL will never
+   * call next on right side with empty left batch
+   *
+   * @return - int - index of row to process.
+   */
+  @Override
+  public int getRecordIndex() {
+    Preconditions.checkState (leftJoinIndex < left.getRecordCount(),
+      String.format("Left join index: %d is out of bounds: %d", leftJoinIndex, 
left.getRecordCount()));
+    return leftJoinIndex;
+  }
+
+  /**
+   * Returns the current {@link 
org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left incoming 
batch
+   */
+  public IterOutcome getLeftOutcome() {
+    return leftUpstream;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5a63e274/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java
new file mode 100644
index 0000000..019b095
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.config.LateralJoinPOP;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.util.List;
+
+public class LateralJoinBatchCreator implements BatchCreator<LateralJoinPOP> {
+  @Override
+  public LateralJoinBatch getBatch(ExecutorFragmentContext context, 
LateralJoinPOP config, List<RecordBatch> children)
+      throws ExecutionSetupException {
+    return new LateralJoinBatch(config, context, children.get(0), 
children.get(1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5a63e274/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinTemplate.java
new file mode 100644
index 0000000..6e756a4
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinTemplate.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.base.Preconditions;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+
+import javax.inject.Named;
+
+/*
+ * Template class that combined with the runtime generated source implements 
the LateralJoin interface. This
+ * class contains the main lateral join logic.
+ */
+public abstract class LateralJoinTemplate implements LateralJoin {
+
+  // Current right input batch being processed
+  private RecordBatch right = null;
+
+  // Index in outgoing container where new record will be inserted
+  private int outputIndex = 0;
+
+  // Keep the join type at setup phase
+  private JoinRelType joinType;
+
+  /**
+   * Method initializes necessary state and invokes the doSetup() to set the 
input and output value vector references.
+   *
+   * @param context Fragment context
+   * @param left Current left input batch being processed
+   * @param right Current right input batch being processed
+   * @param outgoing Output batch
+   */
+  public void setupLateralJoin(FragmentContext context,
+                               RecordBatch left, RecordBatch right,
+                               LateralJoinBatch outgoing, JoinRelType 
joinType) {
+    this.right = right;
+    this.joinType = joinType;
+    doSetup(context, this.right, left, outgoing);
+  }
+
+  /**
+   * Main entry point for producing the output records. This method populates 
the output batch after cross join of
+   * the record in a given left batch at left index and all the corresponding 
right batches produced for
+   * this left index. The right container is copied starting from rightIndex 
until number of records in the container.
+   *
+   * @return the number of records produced in the output batch
+   */
+  public int crossJoinAndOutputRecords(int leftIndex, int rightIndex) {
+
+    final int rightRecordCount = right.getRecordCount();
+    int currentOutputIndex = outputIndex;
+
+    // If there is no record in right batch just return current index in 
output batch
+    if (rightRecordCount <= 0) {
+      return currentOutputIndex;
+    }
+
+    // Check if right batch is empty since we have to handle left join case
+    Preconditions.checkState(rightIndex != -1, "Right batch record count is >0 
but index is -1");
+    // For every record in right side just emit left and right records in 
output container
+    for (; rightIndex < rightRecordCount; ++rightIndex) {
+      emitLeft(leftIndex, currentOutputIndex);
+      emitRight(rightIndex, currentOutputIndex);
+      ++currentOutputIndex;
+
+      if (currentOutputIndex >= LateralJoinBatch.MAX_BATCH_SIZE) {
+        break;
+      }
+    }
+
+    updateOutputIndex(currentOutputIndex);
+    return currentOutputIndex;
+  }
+
+  /**
+   * If current output batch is full then reset the output index for next 
output batch
+   * Otherwise it means we still have space left in output batch, so next call 
will continue populating from
+   * newOutputIndex
+   * @param newOutputIndex - new output index of outgoing batch after copying 
the records
+   */
+  private void updateOutputIndex(int newOutputIndex) {
+    outputIndex = (newOutputIndex >= LateralJoinBatch.MAX_BATCH_SIZE) ?
+      0 : newOutputIndex;
+  }
+
+  /**
+   * Method to copy just the left batch record at given leftIndex, the right 
side records will be NULL. This is
+   * used in case when Join Type is LEFT and we have only seen empty batches 
from right side
+   * @param leftIndex - index in left batch to copy record from
+   */
+  public void generateLeftJoinOutput(int leftIndex) {
+    int currentOutputIndex = outputIndex;
+
+    if (JoinRelType.LEFT == joinType) {
+      emitLeft(leftIndex, currentOutputIndex++);
+      updateOutputIndex(currentOutputIndex);
+    }
+  }
+
+  /**
+   * Generated method to setup vector references in rightBatch, leftBatch and 
outgoing batch. It should be called
+   * after initial schema build phase, when the schema for outgoing container 
is known. This method should also be
+   * called after each New Schema discovery during execution.
+   * @param context
+   * @param rightBatch - right incoming batch
+   * @param leftBatch - left incoming batch
+   * @param outgoing - output batch
+   */
+  public abstract void doSetup(@Named("context") FragmentContext context,
+                               @Named("rightBatch") RecordBatch rightBatch,
+                               @Named("leftBatch") RecordBatch leftBatch,
+                               @Named("outgoing") RecordBatch outgoing);
+
+  /**
+   * Generated method to copy the record from right batch at rightIndex to 
outgoing batch at outIndex
+   * @param rightIndex - index to copy record from the right batch
+   * @param outIndex - index to copy record to a outgoing batch
+   */
+  public abstract void emitRight(@Named("rightIndex") int rightIndex,
+                                 @Named("outIndex") int outIndex);
+
+  /**
+   * Generated method to copy the record from left batch at leftIndex to 
outgoing batch at outIndex
+   * @param leftIndex - index to copy record from the left batch
+   * @param outIndex - index to copy record to a outgoing batch
+   */
+  public abstract void emitLeft(@Named("leftIndex") int leftIndex,
+                                @Named("outIndex") int outIndex);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5a63e274/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
index 6de4df6..881954c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
@@ -56,6 +56,7 @@ public class RecordBatchData {
         throw new UnsupportedOperationException("Record batch data can't be 
created based on a hyper batch.");
       }
       TransferPair tp = v.getValueVector().getTransferPair(allocator);
+      // Transfer make sure of releasing memory for value vector in source 
container.
       tp.transfer();
       vectors.add(tp.getTo());
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/5a63e274/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
index 94aef07..b671915 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
@@ -51,7 +51,11 @@ public abstract class AbstractBinaryRecordBatch<T extends 
PhysicalOperator> exte
    *         false if caller should stop and exit from processing.
    */
   protected boolean prefetchFirstBatchFromBothSides() {
+    // Left can get batch with zero or more records with OK_NEW_SCHEMA outcome 
as first batch
     leftUpstream = next(0, left);
+
+    // Right will always get zero records with OK_NEW_SCHEMA outcome as first 
batch, since right
+    // now Lateral will always be tied up with UNNEST
     rightUpstream = next(1, right);
 
     if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) 
{
@@ -69,6 +73,13 @@ public abstract class AbstractBinaryRecordBatch<T extends 
PhysicalOperator> exte
       return false;
     }
 
+    // EMIT outcome is not expected as part of first batch from either side
+    if (leftUpstream == IterOutcome.EMIT || rightUpstream == IterOutcome.EMIT) 
{
+      state = BatchState.STOP;
+      throw new IllegalStateException("Unexpected IterOutcome.EMIT received 
either from left or right side in " +
+        "buildSchema phase");
+    }
+
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/5a63e274/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 015d078..8bf1856 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -126,6 +126,7 @@ public abstract class AbstractRecordBatch<T extends 
PhysicalOperator> implements
       stats.batchReceived(inputIndex, b.getRecordCount(), true);
       break;
     case OK:
+    case EMIT:
       stats.batchReceived(inputIndex, b.getRecordCount(), false);
       break;
     default:

Reply via email to