This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 0a4a37e11f2f2ad66de80918a4b34665e42803fc
Author: chunhui-shi <[email protected]>
AuthorDate: Mon Mar 26 12:15:09 2018 -0700

    DRILL-6321: Lateral Join and Unnest - initial implementation for parser and 
planning
---
 .../org/apache/drill/exec/opt/BasicOptimizer.java  |   6 ++
 .../physical/base/AbstractPhysicalVisitor.java     |   6 ++
 .../drill/exec/physical/base/PhysicalVisitor.java  |   2 +
 .../drill/exec/physical/config/LateralJoinPOP.java |  22 +++-
 .../drill/exec/physical/config/UnnestPOP.java      |  32 ++++--
 .../impl/join/LateralJoinBatchCreator.java         |   9 +-
 .../physical/impl/unnest/UnnestRecordBatch.java    |  20 +++-
 .../apache/drill/exec/planner/PlannerPhase.java    |  10 +-
 .../exec/planner/common/DrillCorrelateRelBase.java |  55 ++++++++++
 .../exec/planner/common/DrillUnnestRelBase.java    |  54 ++++++++++
 .../drill/exec/planner/fragment/Materializer.java  |  38 +++++++
 .../exec/planner/logical/DrillCorrelateRel.java    |  51 +++++++++
 .../exec/planner/logical/DrillCorrelateRule.java   |  53 ++++++++++
 .../logical/DrillUnnestRel.java}                   |  32 ++++--
 .../exec/planner/logical/DrillUnnestRule.java      |  48 +++++++++
 .../drill/exec/planner/physical/CorrelatePrel.java |  89 ++++++++++++++++
 .../exec/planner/physical/CorrelatePrule.java      |  56 ++++++++++
 .../drill/exec/planner/physical/UnnestPrel.java    |  78 ++++++++++++++
 .../drill/exec/planner/physical/UnnestPrule.java   |  49 +++++++++
 .../impl/lateraljoin/TestLateralPhysicalPlan.java  |  88 ++++++++++++++++
 .../test/resources/lateraljoin/lateralplan1.json   |  95 +++++++++++++++++
 .../resources/lateraljoin/nested-customer.json     | 114 +++++++++++++++++++++
 .../resources/lateraljoin/nested-customer.parquet  | Bin 0 -> 3360 bytes
 .../drill/common/logical/data/LateralJoin.java     |  45 ++++----
 .../apache/drill/common/logical/data/Unnest.java   |  33 ++++--
 .../data/visitors/AbstractLogicalVisitor.java      |  12 +++
 .../logical/data/visitors/LogicalVisitor.java      |   5 +
 27 files changed, 1046 insertions(+), 56 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index 9b04e94..36e74a0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -37,6 +37,7 @@ import org.apache.drill.common.logical.data.Project;
 import org.apache.drill.common.logical.data.Scan;
 import org.apache.drill.common.logical.data.SinkOperator;
 import org.apache.drill.common.logical.data.Store;
+import org.apache.drill.common.logical.data.Unnest;
 import org.apache.drill.common.logical.data.Window;
 import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor;
 import org.apache.drill.common.types.TypeProtos;
@@ -238,5 +239,10 @@ public class BasicOptimizer extends Optimizer {
       final PhysicalOperator child = filter.getInput().accept(this, obj);
       return new SelectionVectorRemover(new 
org.apache.drill.exec.physical.config.Filter(child, filter.getExpr(), 1.0f));
     }
+
+    @Override
+    public PhysicalOperator visitUnnest(final Unnest unnest, final Object obj) 
throws OptimizerException {
+      return new org.apache.drill.exec.physical.config.UnnestPOP(null, 
unnest.getColumn());
+    }
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index 86a31c1..340c303 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.physical.config.FlattenPOP;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.config.IteratorValidator;
+import org.apache.drill.exec.physical.config.LateralJoinPOP;
 import org.apache.drill.exec.physical.config.Limit;
 import org.apache.drill.exec.physical.config.MergingReceiverPOP;
 import org.apache.drill.exec.physical.config.OrderedPartitionSender;
@@ -181,6 +182,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E 
extends Throwable> impleme
   }
 
   @Override
+  public T visitLateralJoin(LateralJoinPOP lateralJoinPOP, X value) throws E {
+    return visitOp(lateralJoinPOP, value);
+  }
+
+  @Override
   public T visitIteratorValidator(IteratorValidator op, X value) throws E {
     return visitOp(op, value);
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 5c926ad..f2e53eb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.physical.config.FlattenPOP;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.config.IteratorValidator;
+import org.apache.drill.exec.physical.config.LateralJoinPOP;
 import org.apache.drill.exec.physical.config.Limit;
 import org.apache.drill.exec.physical.config.MergingReceiverPOP;
 import org.apache.drill.exec.physical.config.OrderedPartitionSender;
@@ -78,6 +79,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends 
Throwable> {
   public RETURN visitWindowFrame(WindowPOP op, EXTRA value) throws EXCEP;
   public RETURN visitProducerConsumer(ProducerConsumer op, EXTRA value) throws 
EXCEP;
   public RETURN visitUnnest(UnnestPOP unnest, EXTRA value) throws EXCEP;
+  public RETURN visitLateralJoin(LateralJoinPOP lateralJoinPOP, EXTRA value) 
throws EXCEP;
 
   public RETURN visitIteratorValidator(IteratorValidator op, EXTRA value) 
throws EXCEP;
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
index 946b4a6..fab89a2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.drill.exec.physical.base.AbstractJoinPop;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import java.util.List;
@@ -33,6 +34,9 @@ import java.util.List;
 public class LateralJoinPOP extends AbstractJoinPop {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LateralJoinPOP.class);
 
+  @JsonProperty("unnestForLateralJoin")
+  private UnnestPOP unnestForLateralJoin;
+
   @JsonCreator
   public LateralJoinPOP(
       @JsonProperty("left") PhysicalOperator left,
@@ -45,11 +49,27 @@ public class LateralJoinPOP extends AbstractJoinPop {
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.size() == 2,
       "Lateral join should have two physical operators");
-    return new LateralJoinPOP(children.get(0), children.get(1), joinType);
+    LateralJoinPOP newPOP =  new LateralJoinPOP(children.get(0), 
children.get(1), joinType);
+    newPOP.unnestForLateralJoin = this.unnestForLateralJoin;
+    return newPOP;
+  }
+
+  @JsonProperty("unnestForLateralJoin")
+  public UnnestPOP getUnnestForLateralJoin() {
+    return this.unnestForLateralJoin;
+  }
+
+  public void setUnnestForLateralJoin(UnnestPOP unnest) {
+    this.unnestForLateralJoin = unnest;
   }
 
   @Override
   public int getOperatorType() {
     return CoreOperatorType.LATERAL_JOIN_VALUE;
   }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value) throws E {
+    return physicalVisitor.visitLateralJoin(this, value);
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnnestPOP.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnnestPOP.java
index 6e35134..37ba495 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnnestPOP.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnnestPOP.java
@@ -18,36 +18,49 @@
 package org.apache.drill.exec.physical.config;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.collect.Iterators;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.base.AbstractSingle;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.Leaf;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.impl.unnest.UnnestRecordBatch;
 
 import java.util.Iterator;
 
 import static 
org.apache.drill.exec.proto.UserBitShared.CoreOperatorType.UNNEST_VALUE;
 
 @JsonTypeName("unnest")
-public class UnnestPOP extends AbstractSingle {
+public class UnnestPOP extends AbstractBase implements Leaf {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(UnnestPOP.class);
 
   private SchemaPath column;
 
+  @JsonIgnore
+  private UnnestRecordBatch unnestBatch;
+
   @JsonCreator
   public UnnestPOP(
       @JsonProperty("child") PhysicalOperator child, // Operator with incoming 
record batch
       @JsonProperty("column") SchemaPath column) {
-    super(child);
     this.column = column;
   }
 
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) 
throws ExecutionSetupException {
+    assert children.isEmpty();
+    UnnestPOP newUnnest = new UnnestPOP(null, column);
+    newUnnest.addUnnestBatch(this.unnestBatch);
+    return newUnnest;
+  }
 
   @Override
   public Iterator<PhysicalOperator> iterator() {
-    return Iterators.singletonIterator(child);
+    return Iterators.emptyIterator();
   }
 
   public SchemaPath getColumn() {
@@ -59,10 +72,13 @@ public class UnnestPOP extends AbstractSingle {
     return physicalVisitor.visitUnnest(this, value);
   }
 
-  @Override
-  public PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    UnnestPOP unnest =  new UnnestPOP(child, column);
-    return unnest;
+  public void addUnnestBatch(UnnestRecordBatch unnestBatch) {
+    this.unnestBatch = unnestBatch;
+  }
+
+  @JsonIgnore
+  public UnnestRecordBatch getUnnestBatch() {
+    return this.unnestBatch;
   }
 
   @Override
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java
index 6ed593d..f868596 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java
@@ -19,7 +19,9 @@ package org.apache.drill.exec.physical.impl.join;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.base.LateralContract;
 import org.apache.drill.exec.physical.config.LateralJoinPOP;
+import org.apache.drill.exec.physical.config.UnnestPOP;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
 
@@ -29,6 +31,11 @@ public class LateralJoinBatchCreator implements 
BatchCreator<LateralJoinPOP> {
   @Override
   public LateralJoinBatch getBatch(ExecutorFragmentContext context, 
LateralJoinPOP config, List<RecordBatch> children)
       throws ExecutionSetupException {
-    return new LateralJoinBatch(config, context, children.get(0), 
children.get(1));
+    LateralJoinBatch ljBatch = new LateralJoinBatch(config, context, 
children.get(0), children.get(1));
+    UnnestPOP unnest = config.getUnnestForLateralJoin();
+    if (unnest != null) {
+      unnest.getUnnestBatch().setIncoming((LateralContract)ljBatch);
+    }
+    return ljBatch;
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index d366c80..3ef8179 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.unnest;
 
 import com.google.common.base.Preconditions;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.FieldReference;
@@ -28,6 +29,9 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.config.UnnestPOP;
+import org.apache.drill.exec.physical.impl.join.LateralJoinBatch;
+import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.AbstractTableFunctionRecordBatch;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField;
@@ -37,9 +41,11 @@ import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 
+import java.util.Iterator;
 import java.util.List;
 
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
@@ -81,6 +87,7 @@ public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch<UnnestPO
     }
   }
 
+
   /**
    * Memory manager for Unnest. Estimates the batch size exactly like we do 
for Flatten.
    */
@@ -134,6 +141,7 @@ public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch<UnnestPO
 
   public UnnestRecordBatch(UnnestPOP pop, FragmentContext context) throws 
OutOfMemoryException {
     super(pop, context);
+    pop.addUnnestBatch(this);
     // get the output batch size from config.
     int configuredBatchSize = (int) 
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
     memoryManager = new UnnestMemoryManager(configuredBatchSize);
@@ -166,7 +174,6 @@ public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch<UnnestPO
     hasRemainder = false; // whatever the case, we need to stop processing the 
current row.
   }
 
-
   @Override
   public IterOutcome innerNext() {
 
@@ -261,7 +268,6 @@ public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch<UnnestPO
     unnest.setUnnestField(vector);
   }
 
-  @Override
   protected IterOutcome doWork() {
     Preconditions.checkNotNull(lateral);
     memoryManager.update();
@@ -355,7 +361,15 @@ public class UnnestRecordBatch extends 
AbstractTableFunctionRecordBatch<UnnestPO
 
     final ValueVector unnestVector = transferPair.getTo();
     transfers.add(transferPair);
-    container.add(unnestVector);
+    if (unnestVector instanceof MapVector) {
+      Iterator<ValueVector> it = unnestVector.iterator();
+      while (it.hasNext()) {
+        container.add(it.next());
+      }
+    }
+    else {
+      container.add(unnestVector);
+    }
     logger.debug("Added transfer for unnest expression.");
     container.buildSchema(SelectionVectorMode.NONE);
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index 17fedc4..3196bd0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -28,6 +28,7 @@ import org.apache.calcite.tools.RuleSet;
 import org.apache.calcite.tools.RuleSets;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.planner.logical.DrillAggregateRule;
+import org.apache.drill.exec.planner.logical.DrillCorrelateRule;
 import org.apache.drill.exec.planner.logical.DrillFilterAggregateTransposeRule;
 import org.apache.drill.exec.planner.logical.DrillFilterItemStarReWriterRule;
 import org.apache.drill.exec.planner.logical.DrillFilterJoinRules;
@@ -48,11 +49,13 @@ import 
org.apache.drill.exec.planner.logical.DrillRelFactories;
 import org.apache.drill.exec.planner.logical.DrillScanRule;
 import org.apache.drill.exec.planner.logical.DrillSortRule;
 import org.apache.drill.exec.planner.logical.DrillUnionAllRule;
+import org.apache.drill.exec.planner.logical.DrillUnnestRule;
 import org.apache.drill.exec.planner.logical.DrillValuesRule;
 import org.apache.drill.exec.planner.logical.DrillWindowRule;
 import org.apache.drill.exec.planner.logical.partition.ParquetPruneScanRule;
 import org.apache.drill.exec.planner.logical.partition.PruneScanRule;
 import org.apache.drill.exec.planner.physical.ConvertCountToDirectScan;
+import org.apache.drill.exec.planner.physical.CorrelatePrule;
 import org.apache.drill.exec.planner.physical.DirectScanPrule;
 import org.apache.drill.exec.planner.physical.FilterPrule;
 import org.apache.drill.exec.planner.physical.HashAggPrule;
@@ -70,6 +73,7 @@ import 
org.apache.drill.exec.planner.physical.SortConvertPrule;
 import org.apache.drill.exec.planner.physical.SortPrule;
 import org.apache.drill.exec.planner.physical.StreamAggPrule;
 import org.apache.drill.exec.planner.physical.UnionAllPrule;
+import org.apache.drill.exec.planner.physical.UnnestPrule;
 import org.apache.drill.exec.planner.physical.ValuesPrule;
 import org.apache.drill.exec.planner.physical.WindowPrule;
 import org.apache.drill.exec.planner.physical.WriterPrule;
@@ -307,7 +311,9 @@ public enum PlannerPhase {
       DrillSortRule.INSTANCE,
       DrillJoinRule.INSTANCE,
       DrillUnionAllRule.INSTANCE,
-      DrillValuesRule.INSTANCE
+      DrillValuesRule.INSTANCE,
+      DrillUnnestRule.INSTANCE,
+      DrillCorrelateRule.INSTANCE
       ).build();
 
   /**
@@ -442,6 +448,8 @@ public enum PlannerPhase {
     ruleList.add(UnionAllPrule.INSTANCE);
     ruleList.add(ValuesPrule.INSTANCE);
     ruleList.add(DirectScanPrule.INSTANCE);
+    ruleList.add(UnnestPrule.INSTANCE);
+    ruleList.add(CorrelatePrule.INSTANCE);
 
     ruleList.add(DrillPushLimitToScanRule.LIMIT_ON_PROJECT);
     ruleList.add(DrillPushLimitToScanRule.LIMIT_ON_SCAN);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillCorrelateRelBase.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillCorrelateRelBase.java
new file mode 100644
index 0000000..ea994ba
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillCorrelateRelBase.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.common;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.sql.SemiJoinType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+
+
+public abstract class DrillCorrelateRelBase extends Correlate implements 
DrillRelNode {
+  public DrillCorrelateRelBase(RelOptCluster cluster, RelTraitSet traits, 
RelNode left, RelNode right,
+                               CorrelationId correlationId, ImmutableBitSet 
requiredColumns, SemiJoinType semiJoinType) {
+    super(cluster, traits, left, right, correlationId, requiredColumns, 
semiJoinType);
+  }
+
+  @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
+                                              RelMetadataQuery mq) {
+    DrillCostBase.DrillCostFactory costFactory = 
(DrillCostBase.DrillCostFactory) planner.getCostFactory();
+
+    double rowCount = mq.getRowCount(this.getLeft());
+    long fieldWidth = PrelUtil.getPlannerSettings(planner).getOptions()
+        .getOption(ExecConstants.AVERAGE_FIELD_WIDTH_KEY).num_val;
+
+    double rowSize = (this.getLeft().getRowType().getFieldList().size()) * 
fieldWidth;
+
+    double cpuCost = rowCount * rowSize * DrillCostBase.BASE_CPU_COST;
+    double memCost = 0;
+    return costFactory.makeCost(rowCount, cpuCost, 0, 0, memCost);
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnnestRelBase.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnnestRelBase.java
new file mode 100644
index 0000000..04bb2d6
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnnestRelBase.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.common;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+
+public class DrillUnnestRelBase extends AbstractRelNode implements 
DrillRelNode {
+
+  final protected RexNode ref;
+
+  public DrillUnnestRelBase(RelOptCluster cluster, RelTraitSet traitSet, 
RexNode ref) {
+    super(cluster, traitSet);
+    this.ref = ref;
+  }
+
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery 
mq) {
+
+    double rowCount = mq.getRowCount(this);
+    // Attribute small cost for projecting simple fields. In reality 
projecting simple columns in not free and
+    // this allows projection pushdown/project-merge rules to kick-in thereby 
eliminating unneeded columns from
+    // the projection.
+    double cpuCost = DrillCostBase.BASE_CPU_COST * rowCount * 
this.getRowType().getFieldCount();
+
+    DrillCostBase.DrillCostFactory costFactory = 
(DrillCostBase.DrillCostFactory) planner.getCostFactory();
+    return costFactory.makeCost(rowCount, cpuCost, 0, 0);
+  }
+
+  public RexNode getRef() {
+    return this.ref;
+  }
+}
\ No newline at end of file
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
index 2fc7541..987e65c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
@@ -30,6 +30,8 @@ import org.apache.drill.exec.physical.base.Store;
 import org.apache.drill.exec.physical.base.SubScan;
 
 import com.google.common.collect.Lists;
+import org.apache.drill.exec.physical.config.LateralJoinPOP;
+import org.apache.drill.exec.physical.config.UnnestPOP;
 
 public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, 
Materializer.IndexedFragmentNode, ExecutionSetupException>{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(Materializer.class);
@@ -106,10 +108,38 @@ public class Materializer extends 
AbstractPhysicalVisitor<PhysicalOperator, Mate
     return newOp;
   }
 
+
+  @Override
+  public PhysicalOperator visitLateralJoin(LateralJoinPOP op, 
IndexedFragmentNode iNode) throws ExecutionSetupException {
+    iNode.addAllocation(op);
+    List<PhysicalOperator> children = Lists.newArrayList();
+
+    children.add(op.getLeft().accept(this, iNode));
+    children.add(op.getRight().accept(this, iNode));
+    UnnestPOP unnestInLeftInput = iNode.getUnnest();
+
+    PhysicalOperator newOp = op.getNewWithChildren(children);
+    newOp.setCost(op.getCost());
+    newOp.setOperatorId(Short.MAX_VALUE & op.getOperatorId());
+
+    ((LateralJoinPOP)newOp).setUnnestForLateralJoin(unnestInLeftInput);
+
+    return newOp;
+  }
+
+  @Override
+  public PhysicalOperator visitUnnest(UnnestPOP unnest, IndexedFragmentNode 
value) throws ExecutionSetupException {
+    PhysicalOperator newOp = visitOp(unnest, value);
+    value.addUnnest((UnnestPOP)newOp);
+    return newOp;
+  }
+
   public static class IndexedFragmentNode{
     final Wrapper info;
     final int minorFragmentId;
 
+    UnnestPOP unnest = null;
+
     public IndexedFragmentNode(int minorFragmentId, Wrapper info) {
       super();
       this.info = info;
@@ -132,6 +162,14 @@ public class Materializer extends 
AbstractPhysicalVisitor<PhysicalOperator, Mate
       info.addAllocation(pop);
     }
 
+    public void addUnnest(UnnestPOP unnest) {
+      this.unnest = unnest;
+    }
+
+    public UnnestPOP getUnnest() {
+      return this.unnest;
+    }
+
   }
 
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRel.java
new file mode 100644
index 0000000..e5eee4e
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRel.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.sql.SemiJoinType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.planner.common.DrillCorrelateRelBase;
+
+
+public class DrillCorrelateRel extends DrillCorrelateRelBase implements 
DrillRel {
+
+  protected DrillCorrelateRel(RelOptCluster cluster, RelTraitSet traits, 
RelNode left, RelNode right,
+                              CorrelationId correlationId, ImmutableBitSet 
requiredColumns, SemiJoinType semiJoinType) {
+    super(cluster, traits, left, right, correlationId, requiredColumns, 
semiJoinType);
+  }
+
+  @Override
+  public Correlate copy(RelTraitSet traitSet,
+        RelNode left, RelNode right, CorrelationId correlationId,
+        ImmutableBitSet requiredColumns, SemiJoinType joinType) {
+    return new DrillCorrelateRel(this.getCluster(), this.getTraitSet(), left, 
right, correlationId, requiredColumns,
+        this.getJoinType());
+  }
+
+  @Override
+  public LogicalOperator implement(DrillImplementor implementor) {
+    //TODO: implementation for direct convert from RelNode to logical operator 
for explainPlan
+    return null;
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java
new file mode 100644
index 0000000..8ac4fb1
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.util.trace.CalciteTrace;
+import org.slf4j.Logger;
+
+public class DrillCorrelateRule extends RelOptRule {
+  public static final RelOptRule INSTANCE = new DrillCorrelateRule();
+  protected static final Logger tracer = CalciteTrace.getPlannerTracer();
+
+  private DrillCorrelateRule() {
+    super(RelOptHelper.any(LogicalCorrelate.class, Convention.NONE),
+        DrillRelFactories.LOGICAL_BUILDER,
+        "DrillCorrelateRule");
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final LogicalCorrelate correlate = call.rel(0);
+    final RelNode left = correlate.getLeft();
+    final RelNode right = correlate.getRight();
+    final RelNode convertedLeft = convert(left, 
left.getTraitSet().plus(DrillRel.DRILL_LOGICAL).simplify());
+    final RelNode convertedRight = convert(right, 
right.getTraitSet().plus(DrillRel.DRILL_LOGICAL).simplify());
+
+    final RelTraitSet traits = 
correlate.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
+    DrillCorrelateRel correlateRel = new 
DrillCorrelateRel(correlate.getCluster(),
+        traits, convertedLeft, convertedRight, correlate.getCorrelationId(),
+        correlate.getRequiredColumns(), correlate.getJoinType());
+    call.transformTo(correlateRel);
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnnestRel.java
similarity index 50%
copy from 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java
copy to 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnnestRel.java
index 6ed593d..ab827e5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnnestRel.java
@@ -15,20 +15,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.physical.impl.join;
+package org.apache.drill.exec.planner.logical;
 
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.ExecutorFragmentContext;
-import org.apache.drill.exec.physical.config.LateralJoinPOP;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.record.RecordBatch;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
 
-import java.util.List;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.planner.common.DrillUnnestRelBase;
+
+
+public class DrillUnnestRel extends DrillUnnestRelBase implements DrillRel {
+
+
+  public DrillUnnestRel(RelOptCluster cluster, RelTraitSet traits,
+                        RelDataType rowType, RexNode ref) {
+    super(cluster, traits, ref);
+    this.rowType = rowType;
+  }
 
-public class LateralJoinBatchCreator implements BatchCreator<LateralJoinPOP> {
   @Override
-  public LateralJoinBatch getBatch(ExecutorFragmentContext context, 
LateralJoinPOP config, List<RecordBatch> children)
-      throws ExecutionSetupException {
-    return new LateralJoinBatch(config, context, children.get(0), 
children.get(1));
+  public LogicalOperator implement(DrillImplementor implementor) {
+    //TODO: implementation for direct convert from RelNode to logical operator 
for explainPlan
+    return null;
   }
+
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnnestRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnnestRule.java
new file mode 100644
index 0000000..762eb46
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnnestRule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.Uncollect;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalValues;
+
+public class DrillUnnestRule extends RelOptRule {
+  public static final RelOptRule INSTANCE = new DrillUnnestRule();
+
+  private DrillUnnestRule() {
+    super(RelOptHelper.some(Uncollect.class,
+        RelOptHelper.some(LogicalProject.class, 
RelOptHelper.any(LogicalValues.class, Convention.NONE))),
+        DrillRelFactories.LOGICAL_BUILDER, "DrillUnnestRule");
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final Uncollect uncollect = call.rel(0);
+    final LogicalProject project = call.rel(1);
+    final LogicalValues values = call.rel(2);
+
+    final RelTraitSet traits = 
uncollect.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
+    DrillUnnestRel unnest = new DrillUnnestRel(uncollect.getCluster(), traits, 
uncollect.getRowType(),
+        project.getProjects().iterator().next());
+    call.transformTo(unnest);
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrel.java
new file mode 100644
index 0000000..9d308f0
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrel.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.sql.SemiJoinType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.LateralJoinPOP;
+import org.apache.drill.exec.planner.common.DrillCorrelateRelBase;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.record.BatchSchema;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class CorrelatePrel extends DrillCorrelateRelBase implements Prel {
+
+
+  protected CorrelatePrel(RelOptCluster cluster, RelTraitSet traits, RelNode 
left, RelNode right,
+                              CorrelationId correlationId, ImmutableBitSet 
requiredColumns, SemiJoinType semiJoinType) {
+    super(cluster, traits, left, right, correlationId, requiredColumns, 
semiJoinType);
+  }
+  @Override
+  public Correlate copy(RelTraitSet traitSet,
+                        RelNode left, RelNode right, CorrelationId 
correlationId,
+                        ImmutableBitSet requiredColumns, SemiJoinType 
joinType) {
+    return new CorrelatePrel(this.getCluster(), this.getTraitSet(), left, 
right, correlationId, requiredColumns,
+        this.getJoinType());
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) 
throws IOException {
+
+    PhysicalOperator leftPop = ((Prel)left).getPhysicalOperator(creator);
+    PhysicalOperator rightPop = ((Prel)right).getPhysicalOperator(creator);
+
+    SemiJoinType jtype = this.getJoinType();
+
+    LateralJoinPOP ljoin = new LateralJoinPOP(leftPop, rightPop, 
jtype.toJoinType());
+    return creator.addMetadata(this, ljoin);
+  }
+
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> visitor, X 
value) throws E {
+    return visitor.visitPrel(this, value);
+  }
+
+  @Override
+  public Iterator<Prel> iterator() {
+    return PrelUtil.iter(getLeft(), getRight());
+  }
+
+  @Override
+  public boolean needsFinalColumnReordering() {
+    return true;
+  }
+
+  @Override
+  public BatchSchema.SelectionVectorMode[] getSupportedEncodings() {
+    return BatchSchema.SelectionVectorMode.DEFAULT;
+  }
+
+  @Override
+  public BatchSchema.SelectionVectorMode getEncoding() {
+    return BatchSchema.SelectionVectorMode.NONE;
+  }
+
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrule.java
new file mode 100644
index 0000000..4f1e1d8
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrule.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.planner.logical.DrillCorrelateRel;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+
+public class CorrelatePrule extends Prule {
+  public static final RelOptRule INSTANCE = new 
CorrelatePrule("Prel.CorrelatePrule",
+      RelOptHelper.any(DrillCorrelateRel.class));
+
+  private CorrelatePrule(String name, RelOptRuleOperand operand) {
+    super(operand, name);
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final DrillCorrelateRel correlate = call.rel(0);
+    final RelNode left = correlate.getLeft();
+    final RelNode right = correlate.getRight();
+    RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
+    RelTraitSet traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL);
+
+    RelTraitSet corrTraits = 
traitsLeft.plus(DrillDistributionTrait.RANDOM_DISTRIBUTED);
+
+    final RelNode convertedLeft = convert(left, traitsLeft);
+    final RelNode convertedRight = convert(right, traitsRight);
+
+    final CorrelatePrel correlatePrel = new 
CorrelatePrel(correlate.getCluster(),
+                                  corrTraits,
+                                  convertedLeft, convertedRight, 
correlate.getCorrelationId(),
+                                  
correlate.getRequiredColumns(),correlate.getJoinType());
+    call.transformTo(correlatePrel);
+  }
+
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java
new file mode 100644
index 0000000..1e87305
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.UnnestPOP;
+import org.apache.drill.exec.planner.common.DrillUnnestRelBase;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.record.BatchSchema;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+public class UnnestPrel extends DrillUnnestRelBase implements Prel {
+
+  protected final UnnestPOP unnestPOP;
+
+  public UnnestPrel(RelOptCluster cluster, RelTraitSet traits,
+                    RelDataType rowType, RexNode ref) {
+    super(cluster, traits, ref);
+    this.unnestPOP = new UnnestPOP(null, 
SchemaPath.getSimplePath(((RexFieldAccess)ref).getField().getName()));
+    this.rowType = rowType;
+  }
+
+  @Override
+  public Iterator<Prel> iterator() {
+    return Collections.emptyIterator();
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> visitor, X 
value) throws E {
+    return visitor.visitPrel(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator)
+      throws IOException {
+    return creator.addMetadata(this, unnestPOP);
+  }
+
+  @Override
+  public BatchSchema.SelectionVectorMode[] getSupportedEncodings() {
+    return BatchSchema.SelectionVectorMode.DEFAULT;
+  }
+
+  @Override
+  public BatchSchema.SelectionVectorMode getEncoding() {
+    return BatchSchema.SelectionVectorMode.NONE;
+  }
+
+  @Override
+  public boolean needsFinalColumnReordering() {
+    return true;
+  }
+
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrule.java
new file mode 100644
index 0000000..765304d
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrule.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.config.UnnestPOP;
+import org.apache.drill.exec.planner.logical.DrillUnnestRel;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+
+public class UnnestPrule extends Prule {
+  public static final RelOptRule INSTANCE = new UnnestPrule();
+
+  private UnnestPrule() {
+    super(RelOptHelper.any(DrillUnnestRel.class), "UnnestPrule");
+  }
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final DrillUnnestRel unnest = call.rel(0);
+    RexNode ref = unnest.getRef();
+    if (ref instanceof RexFieldAccess) {
+      final RexFieldAccess field = (RexFieldAccess)ref;
+      field.getField().getName();
+    }
+
+    UnnestPrel unnestPrel = new UnnestPrel(unnest.getCluster(),
+        unnest.getTraitSet().plus(Prel.DRILL_PHYSICAL), unnest.getRowType(), 
ref);
+
+    call.transformTo(unnestPrel);
+  }
+}
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPhysicalPlan.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPhysicalPlan.java
new file mode 100644
index 0000000..09d85d4
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPhysicalPlan.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.lateraljoin;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.drill.test.BaseTestQuery;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestLateralPhysicalPlan extends BaseTestQuery {
+
+  @Test
+  public void testLateralPlan1() throws Exception {
+    int numOutputRecords = 
testPhysical(getFile("lateraljoin/lateralplan1.json"));
+    assertEquals(numOutputRecords, 12);
+  }
+
+  @Test
+  @Ignore("To be fixed")
+  public void testLateralSqlStar() throws Exception {
+    String Sql = "select * from cp.`lateraljoin/nested-customer.json` t, 
unnest(t.orders) t2 limit 1";
+    testBuilder()
+        .unOrdered()
+        .sqlQuery(Sql)
+        .baselineColumns("c_name", "o_shop")
+        .baselineValues("customer1", "Meno Park 1st")
+        .go();
+
+  }
+
+  @Test
+  public void testLateralSql() throws Exception {
+    String Sql = "select t.c_name, t2.o_shop as o_shop from 
cp.`lateraljoin/nested-customer.json` t, unnest(t.orders) t2 limit 1";
+    testBuilder()
+        .unOrdered()
+        .sqlQuery(Sql)
+        .baselineColumns("c_name", "o_shop")
+        .baselineValues("customer1", "Meno Park 1st")
+        .go();
+
+  }
+
+  @Test
+  @Ignore("naming of single column")
+  public void testLateralSqlPlainCol() throws Exception {
+    String Sql = "select t.c_name, t2.c_phone from 
cp.`lateraljoin/nested-customer.json` t, unnest(t.c_phone) t2 limit 1";
+    testBuilder()
+        .unOrdered()
+        .sqlQuery(Sql)
+        .baselineColumns("c_name", "c_phone_flat")
+        .baselineValues("customer1", "6505200001")
+        .go();
+
+  }
+
+  @Test
+  @Ignore("To be fixed")
+  public void testLateralSqlWithAS() throws Exception {
+    String Sql = "select t.c_name, t2.o_shop from 
cp.`lateraljoin/nested-customer.parquet` t, unnest(t.orders) as t2(o_shop) 
limit 1";
+    testBuilder()
+        .unOrdered()
+        .sqlQuery(Sql)
+        .baselineColumns("c_name", "o_shop")
+        .baselineValues("customer1", "Meno Park 1st")
+        .go();
+
+  }
+  @Test
+  public void testSubQuerySql() throws Exception {
+    String Sql = "select t2.os.* from (select t.orders as os from 
cp.`lateraljoin/nested-customer.parquet` t) t2";
+    test(Sql);
+  }
+}
diff --git a/exec/java-exec/src/test/resources/lateraljoin/lateralplan1.json 
b/exec/java-exec/src/test/resources/lateraljoin/lateralplan1.json
new file mode 100644
index 0000000..9be5878
--- /dev/null
+++ b/exec/java-exec/src/test/resources/lateraljoin/lateralplan1.json
@@ -0,0 +1,95 @@
+{
+  "head" : {
+    "version" : 1,
+    "generator" : {
+      "type" : "DefaultSqlHandler",
+      "info" : ""
+    },
+    "type" : "APACHE_DRILL_PHYSICAL",
+    "options" : [ {
+      "kind" : "DOUBLE",
+      "accessibleScopes" : "ALL",
+      "name" : "planner.index.noncovering_selectivity_threshold",
+      "float_val" : 0.25,
+      "scope" : "SESSION"
+    } ],
+    "queue" : 0,
+    "hasResourcePlan" : false,
+    "resultMode" : "EXEC"
+  },
+  "graph" : [ {
+    "pop" : "parquet-scan",
+    "@id" : 5,
+    "userName" : "root",
+    "entries" : [ {
+      "path" : "lateraljoin/nested-customer.parquet"
+    } ],
+    "storage" : {
+      "type" : "file",
+      "enabled" : true,
+      "connection" : "classpath:///",
+      "formats" : {
+        "json" : {
+          "type" : "json"
+        },
+        "parquet" : {
+          "type" : "parquet"
+        }
+      }
+    },
+    "format" : {
+      "type" : "parquet"
+    },
+    "cost" : 1000.0
+  }, {
+    "pop" : "unnest",
+    "@id" : 7,
+    "userName" : "root",
+    "column" : "`orders`",
+    "cost" : 1000.0
+  }, {
+    "pop" : "project",
+    "@id" : 6,
+    "exprs" : [ {
+      "ref" : "`ITEM`",
+      "expr" : "`o_shop`"
+    }],
+    "child" : 7,
+    "outputProj" : false,
+    "initialAllocation" : 1000000,
+    "maxAllocation" : 10000000000,
+    "cost" : 1000.0
+  }, {
+    "pop" : "lateral-join",
+    "@id" : 4,
+    "left" : 5,
+    "right" : 6,
+    "unnestForLateralJoin": 7,
+    "initialAllocation" : 1000000,
+    "maxAllocation" : 10000000000,
+    "cost" : 1000.0
+  }, {
+    "pop" : "project",
+    "@id" : 3,
+    "exprs" : [ {
+      "ref" : "`ITEM`",
+      "expr" : "`c_name`"
+    }, {
+      "ref" : "`ITEM1`",
+      "expr" : "`o_shop`"
+    } ],
+    "child" : 4,
+    "outputProj" : false,
+    "initialAllocation" : 1000000,
+    "maxAllocation" : 10000000000,
+    "cost" : 1000.0
+  },  {
+    "pop" : "screen",
+    "@id" : 0,
+    "child" : 3,
+    "initialAllocation" : 1000000,
+    "maxAllocation" : 10000000000,
+    "cost" : 1000.0
+  } ]
+}
+
diff --git a/exec/java-exec/src/test/resources/lateraljoin/nested-customer.json 
b/exec/java-exec/src/test/resources/lateraljoin/nested-customer.json
new file mode 100644
index 0000000..710ca42
--- /dev/null
+++ b/exec/java-exec/src/test/resources/lateraljoin/nested-customer.json
@@ -0,0 +1,114 @@
+{
+  "c_name" : "customer1",
+  "c_id" : 1,
+  "c_phone" : ["6505200001", "4085201234", "6125205678"],
+  "orders" : [{"o_id": 1, "o_shop": "Meno Park 1st", "o_amount": 4.5,
+               "items" : [ {"i_name" : "paper towel", "i_number": 2, 
"i_supplier": "oregan"},
+                           {"i_name" : "map", "i_number": 1, "i_supplier": 
"washington"},
+                           {"i_name" : "cheese", "i_number": 9, "i_supplier": 
"california"}
+                         ]
+
+              },
+              {"o_id": 2, "o_shop": "Mountain View 1st", "o_amount": 104.5,
+               "items" : [ {"i_name" : "beef", "i_number": 3, "i_supplier": 
"montana"},
+                           {"i_name" : "tooth paste", "i_number": 4, 
"i_supplier": "washington"},
+                           {"i_name" : "hat", "i_number": 7, "i_supplier": 
"california"}
+                         ]
+
+              },
+              {"o_id": 3, "o_shop": "Sunnyvale 1st", "o_amount": 294.5,
+               "items" : [ {"i_name" : "paper towel", "i_number": 5, 
"i_supplier": "oregan"},
+                           {"i_name" : "tooth paste", "i_number": 6, 
"i_supplier": "washington"},
+                           {"i_name" : "cheese", "i_number": 8, "i_supplier": 
"california"}
+                         ]
+              }
+             ],
+  "c_address" : "bay area, CA"
+}
+{
+  "c_name" : "customer2",
+  "c_id" : 2,
+  "c_phone" : ["1505200001", "7085201234", "2125205678"],
+  "orders" : [{"o_id": 10, "o_shop": "Mountain View 1st", "o_amount": 724.5,
+               "items" : [ {"i_name" : "beef", "i_number": 12, "i_supplier": 
"montana"},
+                           {"i_name" : "tooth paste", "i_number": 11, 
"i_supplier": "washington"},
+                           {"i_name" : "hat", "i_number": 10, "i_supplier": 
"california"}
+                         ]
+
+              },
+
+              {"o_id": 11, "o_shop": "Sunnyvale 1st", "o_amount": 179.5,
+               "items" : [ {"i_name" : "paper towel", "i_number": 13, 
"i_supplier": "oregan"},
+                           {"i_name" : "tooth paste", "i_number": 14, 
"i_supplier": "washington"},
+                           {"i_name" : "cheese", "i_number": 15, "i_supplier": 
"california"}
+                         ]
+              },
+              {"o_id": 12, "o_shop": "Meno Park 1st", "o_amount": 80.0,
+               "items" : [ {"i_name" : "paper towel", "i_number": 13, 
"i_supplier": "oregan"},
+                           {"i_name" : "tooth paste", "i_number": 14, 
"i_supplier": "washington"},
+                           {"i_name" : "cheese", "i_number": 15, "i_supplier": 
"california"}
+                         ]
+              }
+             ],
+  "c_address" : "LA, CA"
+}
+{
+  "c_name" : "customer3",
+  "c_id" : 3,
+  "c_phone" : ["1205200001", "7285201234", "2325205678"],
+  "orders" : [{"o_id": 21, "o_shop": "Meno Park 1st", "o_amount": 192.5,
+               "items" : [ {"i_name" : "beef", "i_number": 22, "i_supplier": 
"montana"},
+                           {"i_name" : "tooth paste", "i_number": 21, 
"i_supplier": "washington"},
+                           {"i_name" : "hat", "i_number": 20, "i_supplier": 
"california"}
+                         ]
+
+              },
+
+              {"o_id": 22, "o_shop": "Mountain View 1st", "o_amount": 680.9,
+               "items" : [ {"i_name" : "paper towel", "i_number": 23, 
"i_supplier": "oregan"},
+                           {"i_name" : "tooth paste", "i_number": 24, 
"i_supplier": "washington"},
+                           {"i_name" : "cheese", "i_number": 25, "i_supplier": 
"california"}
+                         ]
+              },
+
+              {"o_id": 23, "o_shop": "Sunnyvale 1st", "o_amount": 772.2,
+               "items" : [ {"i_name" : "paper towel", "i_number": 26, 
"i_supplier": "oregan"},
+                           {"i_name" : "tooth paste", "i_number": 27, 
"i_supplier": "washington"},
+                           {"i_name" : "cheese", "i_number": 28, "i_supplier": 
"california"}
+                         ]
+              }
+
+             ],
+  "c_address" : "bay area, CA"
+}
+{
+  "c_name" : "customer4",
+  "c_id" : 4,
+  "c_phone" : ["6509200001", "4088201234", "6127205678"],
+  "orders" : [{"o_id": 30, "o_shop": "Mountain View 1st", "o_amount": 870.2,
+               "items" : [ {"i_name" : "beef", "i_number": 32, "i_supplier": 
"montana"},
+                           {"i_name" : "tooth paste", "i_number": 31, 
"i_supplier": "washington"},
+                           {"i_name" : "hat", "i_number": 30, "i_supplier": 
"california"}
+                         ]
+
+              },
+
+              {"o_id": 31, "o_shop": "Sunnyvale 1st", "o_amount": 970.5,
+               "items" : [ {"i_name" : "beef", "i_number": 32, "i_supplier": 
"montana"},
+                           {"i_name" : "tooth paste", "i_number": 31, 
"i_supplier": "washington"},
+                           {"i_name" : "cheese", "i_number": 30, "i_supplier": 
"california"}
+                         ]
+
+              },
+
+              {"o_id": 32, "o_shop": "Meno Park 1st", "o_amount": 1030.1,
+               "items" : [ {"i_name" : "paper towel", "i_number": 36, 
"i_supplier": "oregan"},
+                           {"i_name" : "tooth paste", "i_number": 37, 
"i_supplier": "washington"},
+                           {"i_name" : "cheese", "i_number": 38, "i_supplier": 
"california"}
+                         ]
+              }
+
+             ],
+  "c_address" : "LA, CA"
+}
+
diff --git 
a/exec/java-exec/src/test/resources/lateraljoin/nested-customer.parquet 
b/exec/java-exec/src/test/resources/lateraljoin/nested-customer.parquet
new file mode 100644
index 0000000..97d898a
Binary files /dev/null and 
b/exec/java-exec/src/test/resources/lateraljoin/nested-customer.parquet differ
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
 b/logical/src/main/java/org/apache/drill/common/logical/data/LateralJoin.java
similarity index 50%
copy from 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
copy to 
logical/src/main/java/org/apache/drill/common/logical/data/LateralJoin.java
index 946b4a6..b2c78a4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
+++ 
b/logical/src/main/java/org/apache/drill/common/logical/data/LateralJoin.java
@@ -15,41 +15,46 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.drill.exec.physical.config;
+package org.apache.drill.common.logical.data;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.drill.exec.physical.base.AbstractJoinPop;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import com.google.common.collect.Iterators;
+import org.apache.drill.common.logical.data.visitors.LogicalVisitor;
 
+import java.util.Iterator;
 import java.util.List;
 
 @JsonTypeName("lateral-join")
-public class LateralJoinPOP extends AbstractJoinPop {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(LateralJoinPOP.class);
+public class LateralJoin extends LogicalOperatorBase {
+  private final LogicalOperator left;
+  private final LogicalOperator right;
 
   @JsonCreator
-  public LateralJoinPOP(
-      @JsonProperty("left") PhysicalOperator left,
-      @JsonProperty("right") PhysicalOperator right,
-      @JsonProperty("joinType") JoinRelType joinType) {
-    super(left, right, joinType, null, null);
+  public LateralJoin(@JsonProperty("left") LogicalOperator left, 
@JsonProperty("right") LogicalOperator right) {
+    super();
+    this.left = left;
+    this.right = right;
+    left.registerAsSubscriber(this);
+    right.registerAsSubscriber(this);
+  }
+
+  public LogicalOperator getLeft() {
+    return left;
+  }
+
+  public LogicalOperator getRight() {
+    return right;
   }
 
   @Override
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
-    Preconditions.checkArgument(children.size() == 2,
-      "Lateral join should have two physical operators");
-    return new LateralJoinPOP(children.get(0), children.get(1), joinType);
+  public Iterator<LogicalOperator> iterator() {
+    return Iterators.forArray(getLeft(), getRight());
   }
 
   @Override
-  public int getOperatorType() {
-    return CoreOperatorType.LATERAL_JOIN_VALUE;
+  public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> 
logicalVisitor, X value) throws E {
+    return logicalVisitor.visitLateralJoin(this, value);
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java
 b/logical/src/main/java/org/apache/drill/common/logical/data/Unnest.java
similarity index 51%
copy from 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java
copy to logical/src/main/java/org/apache/drill/common/logical/data/Unnest.java
index 6ed593d..a70e8cc 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/data/Unnest.java
@@ -15,20 +15,31 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.physical.impl.join;
+package org.apache.drill.common.logical.data;
 
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.ExecutorFragmentContext;
-import org.apache.drill.exec.physical.config.LateralJoinPOP;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.record.RecordBatch;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.visitors.LogicalVisitor;
 
-import java.util.List;
+@JsonTypeName("unnest")
+public class Unnest extends SingleInputOperator {
+
+  private final SchemaPath column;
+
+  @JsonCreator
+  public Unnest(@JsonProperty("column") SchemaPath column) {
+    this.column = column;
+  }
+
+  public SchemaPath getColumn() {
+    return column;
+  }
 
-public class LateralJoinBatchCreator implements BatchCreator<LateralJoinPOP> {
   @Override
-  public LateralJoinBatch getBatch(ExecutorFragmentContext context, 
LateralJoinPOP config, List<RecordBatch> children)
-      throws ExecutionSetupException {
-    return new LateralJoinBatch(config, context, children.get(0), 
children.get(1));
+  public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> 
logicalVisitor, X value) throws E {
+    return logicalVisitor.visitUnnest(this, value);
   }
+
 }
diff --git 
a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
 
b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
index a04c3a4..4fd64c5 100644
--- 
a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
+++ 
b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.common.logical.data.visitors;
 
+import org.apache.drill.common.logical.data.LateralJoin;
+import org.apache.drill.common.logical.data.Unnest;
 import org.apache.drill.common.logical.data.Values;
 import org.apache.drill.common.logical.data.Filter;
 import org.apache.drill.common.logical.data.Flatten;
@@ -123,4 +125,14 @@ public abstract class AbstractLogicalVisitor<T, X, E 
extends Throwable> implemen
     public T visitWriter(Writer writer, X value) throws E {
       return visitOp(writer, value);
     }
+
+    @Override
+    public T visitUnnest(Unnest unnest, X value) throws E {
+      return visitOp(unnest, value);
+    }
+
+    @Override
+    public T visitLateralJoin(LateralJoin lateralJoin, X value) throws E {
+        return visitOp(lateralJoin, value);
+    }
 }
diff --git 
a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
 
b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
index 5b5ca5f..55fa838 100644
--- 
a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
+++ 
b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
@@ -18,6 +18,8 @@
 package org.apache.drill.common.logical.data.visitors;
 
 
+import org.apache.drill.common.logical.data.LateralJoin;
+import org.apache.drill.common.logical.data.Unnest;
 import org.apache.drill.common.logical.data.Values;
 import org.apache.drill.common.logical.data.Filter;
 import org.apache.drill.common.logical.data.Flatten;
@@ -63,4 +65,7 @@ public interface LogicalVisitor<RETURN, EXTRA, EXCEP extends 
Throwable> {
     public RETURN visitUnion(Union union, EXTRA value) throws EXCEP;
     public RETURN visitWindow(Window window, EXTRA value) throws EXCEP;
     public RETURN visitWriter(Writer writer, EXTRA value) throws EXCEP;
+
+    public RETURN visitUnnest(Unnest unnest, EXTRA value) throws EXCEP;
+    public RETURN visitLateralJoin(LateralJoin lateralJoin, EXTRA value) 
throws EXCEP;
 }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to