[
https://issues.apache.org/jira/browse/DRILL-6321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476287#comment-16476287
]
ASF GitHub Bot commented on DRILL-6321:
---------------------------------------
parthchandra closed pull request #1258: DRILL-6321: Lateral Join and Unnest -
initial implementation for parser and planning
URL: https://github.com/apache/drill/pull/1258
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index 9b04e94732..36e74a0942 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -37,6 +37,7 @@
import org.apache.drill.common.logical.data.Scan;
import org.apache.drill.common.logical.data.SinkOperator;
import org.apache.drill.common.logical.data.Store;
+import org.apache.drill.common.logical.data.Unnest;
import org.apache.drill.common.logical.data.Window;
import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor;
import org.apache.drill.common.types.TypeProtos;
@@ -238,5 +239,10 @@ public PhysicalOperator visitFilter(final Filter filter,
final Object obj) throw
final PhysicalOperator child = filter.getInput().accept(this, obj);
return new SelectionVectorRemover(new
org.apache.drill.exec.physical.config.Filter(child, filter.getExpr(), 1.0f));
}
+
+ @Override
+ public PhysicalOperator visitUnnest(final Unnest unnest, final Object obj)
throws OptimizerException {
+ return new org.apache.drill.exec.physical.config.UnnestPOP(null,
unnest.getColumn());
+ }
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index 86a31c1972..340c303f75 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -23,6 +23,7 @@
import org.apache.drill.exec.physical.config.HashAggregate;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.config.IteratorValidator;
+import org.apache.drill.exec.physical.config.LateralJoinPOP;
import org.apache.drill.exec.physical.config.Limit;
import org.apache.drill.exec.physical.config.MergingReceiverPOP;
import org.apache.drill.exec.physical.config.OrderedPartitionSender;
@@ -180,6 +181,11 @@ public T visitUnnest(UnnestPOP unnest, X value) throws E {
return visitOp(unnest, value);
}
+ @Override
+ public T visitLateralJoin(LateralJoinPOP lateralJoinPOP, X value) throws E {
+ return visitOp(lateralJoinPOP, value);
+ }
+
@Override
public T visitIteratorValidator(IteratorValidator op, X value) throws E {
return visitOp(op, value);
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 5c926adf12..f2e53eb03e 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -23,6 +23,7 @@
import org.apache.drill.exec.physical.config.HashAggregate;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.config.IteratorValidator;
+import org.apache.drill.exec.physical.config.LateralJoinPOP;
import org.apache.drill.exec.physical.config.Limit;
import org.apache.drill.exec.physical.config.MergingReceiverPOP;
import org.apache.drill.exec.physical.config.OrderedPartitionSender;
@@ -78,6 +79,7 @@
public RETURN visitWindowFrame(WindowPOP op, EXTRA value) throws EXCEP;
public RETURN visitProducerConsumer(ProducerConsumer op, EXTRA value) throws
EXCEP;
public RETURN visitUnnest(UnnestPOP unnest, EXTRA value) throws EXCEP;
+ public RETURN visitLateralJoin(LateralJoinPOP lateralJoinPOP, EXTRA value)
throws EXCEP;
public RETURN visitIteratorValidator(IteratorValidator op, EXTRA value)
throws EXCEP;
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
index 946b4a6871..fab89a24e4 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
@@ -25,6 +25,7 @@
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.exec.physical.base.AbstractJoinPop;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import java.util.List;
@@ -33,6 +34,9 @@
public class LateralJoinPOP extends AbstractJoinPop {
static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(LateralJoinPOP.class);
+ @JsonProperty("unnestForLateralJoin")
+ private UnnestPOP unnestForLateralJoin;
+
@JsonCreator
public LateralJoinPOP(
@JsonProperty("left") PhysicalOperator left,
@@ -45,11 +49,27 @@ public LateralJoinPOP(
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.size() == 2,
"Lateral join should have two physical operators");
- return new LateralJoinPOP(children.get(0), children.get(1), joinType);
+ LateralJoinPOP newPOP = new LateralJoinPOP(children.get(0),
children.get(1), joinType);
+ newPOP.unnestForLateralJoin = this.unnestForLateralJoin;
+ return newPOP;
+ }
+
+ @JsonProperty("unnestForLateralJoin")
+ public UnnestPOP getUnnestForLateralJoin() {
+ return this.unnestForLateralJoin;
+ }
+
+ public void setUnnestForLateralJoin(UnnestPOP unnest) {
+ this.unnestForLateralJoin = unnest;
}
@Override
public int getOperatorType() {
return CoreOperatorType.LATERAL_JOIN_VALUE;
}
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E>
physicalVisitor, X value) throws E {
+ return physicalVisitor.visitLateralJoin(this, value);
+ }
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnnestPOP.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnnestPOP.java
index 6e351343cd..f95481821c 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnnestPOP.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnnestPOP.java
@@ -18,36 +18,50 @@
package org.apache.drill.exec.physical.config;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Iterators;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.base.AbstractSingle;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.Leaf;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.impl.unnest.UnnestRecordBatch;
import java.util.Iterator;
+import java.util.List;
import static
org.apache.drill.exec.proto.UserBitShared.CoreOperatorType.UNNEST_VALUE;
@JsonTypeName("unnest")
-public class UnnestPOP extends AbstractSingle {
+public class UnnestPOP extends AbstractBase implements Leaf {
static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(UnnestPOP.class);
private SchemaPath column;
+ @JsonIgnore
+ private UnnestRecordBatch unnestBatch;
+
@JsonCreator
public UnnestPOP(
@JsonProperty("child") PhysicalOperator child, // Operator with incoming
record batch
@JsonProperty("column") SchemaPath column) {
- super(child);
this.column = column;
}
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children)
throws ExecutionSetupException {
+ assert children.isEmpty();
+ UnnestPOP newUnnest = new UnnestPOP(null, column);
+ newUnnest.addUnnestBatch(this.unnestBatch);
+ return newUnnest;
+ }
@Override
public Iterator<PhysicalOperator> iterator() {
- return Iterators.singletonIterator(child);
+ return Iterators.emptyIterator();
}
public SchemaPath getColumn() {
@@ -59,10 +73,13 @@ public SchemaPath getColumn() {
return physicalVisitor.visitUnnest(this, value);
}
- @Override
- public PhysicalOperator getNewWithChild(PhysicalOperator child) {
- UnnestPOP unnest = new UnnestPOP(child, column);
- return unnest;
+ public void addUnnestBatch(UnnestRecordBatch unnestBatch) {
+ this.unnestBatch = unnestBatch;
+ }
+
+ @JsonIgnore
+ public UnnestRecordBatch getUnnestBatch() {
+ return this.unnestBatch;
}
@Override
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java
index 6ed593dfec..f868596d22 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java
@@ -19,7 +19,9 @@
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.base.LateralContract;
import org.apache.drill.exec.physical.config.LateralJoinPOP;
+import org.apache.drill.exec.physical.config.UnnestPOP;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
@@ -29,6 +31,11 @@
@Override
public LateralJoinBatch getBatch(ExecutorFragmentContext context,
LateralJoinPOP config, List<RecordBatch> children)
throws ExecutionSetupException {
- return new LateralJoinBatch(config, context, children.get(0),
children.get(1));
+ LateralJoinBatch ljBatch = new LateralJoinBatch(config, context,
children.get(0), children.get(1));
+ UnnestPOP unnest = config.getUnnestForLateralJoin();
+ if (unnest != null) {
+ unnest.getUnnestBatch().setIncoming((LateralContract)ljBatch);
+ }
+ return ljBatch;
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index d366c80e90..7e6b141278 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -37,9 +37,11 @@
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.MapVector;
import org.apache.drill.exec.vector.complex.RepeatedMapVector;
import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+import java.util.Iterator;
import java.util.List;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
@@ -81,6 +83,7 @@ public int metricId() {
}
}
+
/**
* Memory manager for Unnest. Estimates the batch size exactly like we do
for Flatten.
*/
@@ -134,6 +137,7 @@ public void update() {
public UnnestRecordBatch(UnnestPOP pop, FragmentContext context) throws
OutOfMemoryException {
super(pop, context);
+ pop.addUnnestBatch(this);
// get the output batch size from config.
int configuredBatchSize = (int)
context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
memoryManager = new UnnestMemoryManager(configuredBatchSize);
@@ -166,7 +170,6 @@ protected void killIncoming(boolean sendUpstream) {
hasRemainder = false; // whatever the case, we need to stop processing the
current row.
}
-
@Override
public IterOutcome innerNext() {
@@ -261,7 +264,6 @@ public VectorContainer getOutgoingContainer() {
unnest.setUnnestField(vector);
}
- @Override
protected IterOutcome doWork() {
Preconditions.checkNotNull(lateral);
memoryManager.update();
@@ -355,7 +357,15 @@ protected IterOutcome doWork() {
final ValueVector unnestVector = transferPair.getTo();
transfers.add(transferPair);
- container.add(unnestVector);
+ if (unnestVector instanceof MapVector) {
+ Iterator<ValueVector> it = unnestVector.iterator();
+ while (it.hasNext()) {
+ container.add(it.next());
+ }
+ }
+ else {
+ container.add(unnestVector);
+ }
logger.debug("Added transfer for unnest expression.");
container.buildSchema(SelectionVectorMode.NONE);
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index 17fedc477d..b5e09ef767 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -28,6 +28,7 @@
import org.apache.calcite.tools.RuleSets;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.planner.logical.DrillAggregateRule;
+import org.apache.drill.exec.planner.logical.DrillCorrelateRule;
import org.apache.drill.exec.planner.logical.DrillFilterAggregateTransposeRule;
import org.apache.drill.exec.planner.logical.DrillFilterItemStarReWriterRule;
import org.apache.drill.exec.planner.logical.DrillFilterJoinRules;
@@ -48,11 +49,13 @@
import org.apache.drill.exec.planner.logical.DrillScanRule;
import org.apache.drill.exec.planner.logical.DrillSortRule;
import org.apache.drill.exec.planner.logical.DrillUnionAllRule;
+import org.apache.drill.exec.planner.logical.DrillUnnestRule;
import org.apache.drill.exec.planner.logical.DrillValuesRule;
import org.apache.drill.exec.planner.logical.DrillWindowRule;
import org.apache.drill.exec.planner.logical.partition.ParquetPruneScanRule;
import org.apache.drill.exec.planner.logical.partition.PruneScanRule;
import org.apache.drill.exec.planner.physical.ConvertCountToDirectScan;
+import org.apache.drill.exec.planner.physical.CorrelatePrule;
import org.apache.drill.exec.planner.physical.DirectScanPrule;
import org.apache.drill.exec.planner.physical.FilterPrule;
import org.apache.drill.exec.planner.physical.HashAggPrule;
@@ -70,6 +73,7 @@
import org.apache.drill.exec.planner.physical.SortPrule;
import org.apache.drill.exec.planner.physical.StreamAggPrule;
import org.apache.drill.exec.planner.physical.UnionAllPrule;
+import org.apache.drill.exec.planner.physical.UnnestPrule;
import org.apache.drill.exec.planner.physical.ValuesPrule;
import org.apache.drill.exec.planner.physical.WindowPrule;
import org.apache.drill.exec.planner.physical.WriterPrule;
@@ -275,6 +279,7 @@ static RuleSet
getDrillUserConfigurableLogicalRules(OptimizerRulesContext optimi
DrillFilterAggregateTransposeRule.INSTANCE,
RuleInstance.FILTER_MERGE_RULE,
+ RuleInstance.FILTER_CORRELATE_RULE,
RuleInstance.AGGREGATE_REMOVE_RULE,
RuleInstance.PROJECT_REMOVE_RULE,
RuleInstance.SORT_REMOVE_RULE,
@@ -307,7 +312,9 @@ static RuleSet
getDrillUserConfigurableLogicalRules(OptimizerRulesContext optimi
DrillSortRule.INSTANCE,
DrillJoinRule.INSTANCE,
DrillUnionAllRule.INSTANCE,
- DrillValuesRule.INSTANCE
+ DrillValuesRule.INSTANCE,
+ DrillUnnestRule.INSTANCE,
+ DrillCorrelateRule.INSTANCE
).build();
/**
@@ -443,6 +450,9 @@ static RuleSet getPhysicalRules(OptimizerRulesContext
optimizerRulesContext) {
ruleList.add(ValuesPrule.INSTANCE);
ruleList.add(DirectScanPrule.INSTANCE);
+ ruleList.add(UnnestPrule.INSTANCE);
+ ruleList.add(CorrelatePrule.INSTANCE);
+
ruleList.add(DrillPushLimitToScanRule.LIMIT_ON_PROJECT);
ruleList.add(DrillPushLimitToScanRule.LIMIT_ON_SCAN);
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
index 49feb41653..986fdbd4d1 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
@@ -27,6 +27,7 @@
import org.apache.calcite.rel.logical.LogicalUnion;
import org.apache.calcite.rel.rules.AggregateExpandDistinctAggregatesRule;
import org.apache.calcite.rel.rules.AggregateRemoveRule;
+import org.apache.calcite.rel.rules.FilterCorrelateRule;
import org.apache.calcite.rel.rules.FilterMergeRule;
import org.apache.calcite.rel.rules.FilterSetOpTransposeRule;
import org.apache.calcite.rel.rules.JoinPushExpressionsRule;
@@ -62,6 +63,9 @@
FilterMergeRule FILTER_MERGE_RULE =
new FilterMergeRule(DrillRelFactories.LOGICAL_BUILDER);
+ FilterCorrelateRule FILTER_CORRELATE_RULE =
+ new FilterCorrelateRule(DrillRelFactories.LOGICAL_BUILDER);
+
AggregateRemoveRule AGGREGATE_REMOVE_RULE =
new AggregateRemoveRule(LogicalAggregate.class,
DrillRelFactories.LOGICAL_BUILDER);
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillCorrelateRelBase.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillCorrelateRelBase.java
new file mode 100644
index 0000000000..ea994ba9f5
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillCorrelateRelBase.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.common;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.sql.SemiJoinType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+
+
+public abstract class DrillCorrelateRelBase extends Correlate implements
DrillRelNode {
+ public DrillCorrelateRelBase(RelOptCluster cluster, RelTraitSet traits,
RelNode left, RelNode right,
+ CorrelationId correlationId, ImmutableBitSet
requiredColumns, SemiJoinType semiJoinType) {
+ super(cluster, traits, left, right, correlationId, requiredColumns,
semiJoinType);
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
+ RelMetadataQuery mq) {
+ DrillCostBase.DrillCostFactory costFactory =
(DrillCostBase.DrillCostFactory) planner.getCostFactory();
+
+ double rowCount = mq.getRowCount(this.getLeft());
+ long fieldWidth = PrelUtil.getPlannerSettings(planner).getOptions()
+ .getOption(ExecConstants.AVERAGE_FIELD_WIDTH_KEY).num_val;
+
+ double rowSize = (this.getLeft().getRowType().getFieldList().size()) *
fieldWidth;
+
+ double cpuCost = rowCount * rowSize * DrillCostBase.BASE_CPU_COST;
+ double memCost = 0;
+ return costFactory.makeCost(rowCount, cpuCost, 0, 0, memCost);
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
index 06f02c0533..10c4738bab 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
@@ -111,11 +111,11 @@ public double estimateRowCount(RelMetadataQuery mq) {
return new HashSet<>(left).removeAll(right);
}
- protected boolean uniqueFieldNames(RelDataType rowType) {
+ public static boolean uniqueFieldNames(RelDataType rowType) {
return isUnique(rowType.getFieldNames());
}
- protected static <T> boolean isUnique(List<T> list) {
+ public static <T> boolean isUnique(List<T> list) {
return new HashSet<>(list).size() == list.size();
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnnestRelBase.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnnestRelBase.java
new file mode 100644
index 0000000000..04bb2d68a1
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnnestRelBase.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.common;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+
+public class DrillUnnestRelBase extends AbstractRelNode implements
DrillRelNode {
+
+ final protected RexNode ref;
+
+ public DrillUnnestRelBase(RelOptCluster cluster, RelTraitSet traitSet,
RexNode ref) {
+ super(cluster, traitSet);
+ this.ref = ref;
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery
mq) {
+
+ double rowCount = mq.getRowCount(this);
+ // Attribute small cost for projecting simple fields. In reality
projecting simple columns in not free and
+ // this allows projection pushdown/project-merge rules to kick-in thereby
eliminating unneeded columns from
+ // the projection.
+ double cpuCost = DrillCostBase.BASE_CPU_COST * rowCount *
this.getRowType().getFieldCount();
+
+ DrillCostBase.DrillCostFactory costFactory =
(DrillCostBase.DrillCostFactory) planner.getCostFactory();
+ return costFactory.makeCost(rowCount, cpuCost, 0, 0);
+ }
+
+ public RexNode getRef() {
+ return this.ref;
+ }
+}
\ No newline at end of file
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
index 2fc754179b..987e65c243 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
@@ -30,6 +30,8 @@
import org.apache.drill.exec.physical.base.SubScan;
import com.google.common.collect.Lists;
+import org.apache.drill.exec.physical.config.LateralJoinPOP;
+import org.apache.drill.exec.physical.config.UnnestPOP;
public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator,
Materializer.IndexedFragmentNode, ExecutionSetupException>{
static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(Materializer.class);
@@ -106,10 +108,38 @@ public PhysicalOperator visitOp(PhysicalOperator op,
IndexedFragmentNode iNode)
return newOp;
}
+
+ @Override
+ public PhysicalOperator visitLateralJoin(LateralJoinPOP op,
IndexedFragmentNode iNode) throws ExecutionSetupException {
+ iNode.addAllocation(op);
+ List<PhysicalOperator> children = Lists.newArrayList();
+
+ children.add(op.getLeft().accept(this, iNode));
+ children.add(op.getRight().accept(this, iNode));
+ UnnestPOP unnestInLeftInput = iNode.getUnnest();
+
+ PhysicalOperator newOp = op.getNewWithChildren(children);
+ newOp.setCost(op.getCost());
+ newOp.setOperatorId(Short.MAX_VALUE & op.getOperatorId());
+
+ ((LateralJoinPOP)newOp).setUnnestForLateralJoin(unnestInLeftInput);
+
+ return newOp;
+ }
+
+ @Override
+ public PhysicalOperator visitUnnest(UnnestPOP unnest, IndexedFragmentNode
value) throws ExecutionSetupException {
+ PhysicalOperator newOp = visitOp(unnest, value);
+ value.addUnnest((UnnestPOP)newOp);
+ return newOp;
+ }
+
public static class IndexedFragmentNode{
final Wrapper info;
final int minorFragmentId;
+ UnnestPOP unnest = null;
+
public IndexedFragmentNode(int minorFragmentId, Wrapper info) {
super();
this.info = info;
@@ -132,6 +162,14 @@ public void addAllocation(PhysicalOperator pop) {
info.addAllocation(pop);
}
+ public void addUnnest(UnnestPOP unnest) {
+ this.unnest = unnest;
+ }
+
+ public UnnestPOP getUnnest() {
+ return this.unnest;
+ }
+
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRel.java
new file mode 100644
index 0000000000..7c49232ac9
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRel.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.sql.SemiJoinType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.drill.common.logical.data.LateralJoin;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.planner.common.DrillCorrelateRelBase;
+
+import java.util.List;
+
+
+public class DrillCorrelateRel extends DrillCorrelateRelBase implements
DrillRel {
+
+ protected DrillCorrelateRel(RelOptCluster cluster, RelTraitSet traits,
RelNode left, RelNode right,
+ CorrelationId correlationId, ImmutableBitSet
requiredColumns, SemiJoinType semiJoinType) {
+ super(cluster, traits, left, right, correlationId, requiredColumns,
semiJoinType);
+ }
+
+ @Override
+ public Correlate copy(RelTraitSet traitSet,
+ RelNode left, RelNode right, CorrelationId correlationId,
+ ImmutableBitSet requiredColumns, SemiJoinType joinType) {
+ return new DrillCorrelateRel(this.getCluster(), this.getTraitSet(), left,
right, correlationId, requiredColumns,
+ this.getJoinType());
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ final List<String> fields = getRowType().getFieldNames();
+ assert DrillJoinRel.isUnique(fields);
+ final int leftCount = left.getRowType().getFieldCount();
+
+ final LogicalOperator leftOp = DrillJoinRel.implementInput(implementor, 0,
0, left, this);
+ final LogicalOperator rightOp = DrillJoinRel.implementInput(implementor,
1, leftCount, right, this);
+
+ return new LateralJoin(leftOp, rightOp);
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java
new file mode 100644
index 0000000000..8ac4fb1a6f
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.util.trace.CalciteTrace;
+import org.slf4j.Logger;
+
+public class DrillCorrelateRule extends RelOptRule {
+ public static final RelOptRule INSTANCE = new DrillCorrelateRule();
+ protected static final Logger tracer = CalciteTrace.getPlannerTracer();
+
+ private DrillCorrelateRule() {
+ super(RelOptHelper.any(LogicalCorrelate.class, Convention.NONE),
+ DrillRelFactories.LOGICAL_BUILDER,
+ "DrillCorrelateRule");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final LogicalCorrelate correlate = call.rel(0);
+ final RelNode left = correlate.getLeft();
+ final RelNode right = correlate.getRight();
+ final RelNode convertedLeft = convert(left,
left.getTraitSet().plus(DrillRel.DRILL_LOGICAL).simplify());
+ final RelNode convertedRight = convert(right,
right.getTraitSet().plus(DrillRel.DRILL_LOGICAL).simplify());
+
+ final RelTraitSet traits =
correlate.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
+ DrillCorrelateRel correlateRel = new
DrillCorrelateRel(correlate.getCluster(),
+ traits, convertedLeft, convertedRight, correlate.getCorrelationId(),
+ correlate.getRequiredColumns(), correlate.getJoinType());
+ call.transformTo(correlateRel);
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
index b6b5c03105..b77fa6103f 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
@@ -103,9 +103,24 @@ public LogicalOperator implement(DrillImplementor
implementor) {
* @return
*/
private LogicalOperator implementInput(DrillImplementor implementor, int i,
int offset, RelNode input) {
- final LogicalOperator inputOp = implementor.visitChild(this, i, input);
+ return implementInput(implementor, i, offset, input, this);
+ }
+
+ /**
+ * Check to make sure that the fields of the inputs are the same as the
output field names.
+ * If not, insert a project renaming them.
+ * @param implementor
+ * @param i
+ * @param offset
+ * @param input
+ * @param currentNode the node to be implemented
+ * @return
+ */
+ public static LogicalOperator implementInput(DrillImplementor implementor,
int i, int offset,
+ RelNode input, DrillRel
currentNode) {
+ final LogicalOperator inputOp = implementor.visitChild(currentNode, i,
input);
assert uniqueFieldNames(input.getRowType());
- final List<String> fields = getRowType().getFieldNames();
+ final List<String> fields = currentNode.getRowType().getFieldNames();
final List<String> inputFields = input.getRowType().getFieldNames();
final List<String> outputFields = fields.subList(offset, offset +
inputFields.size());
if (!outputFields.equals(inputFields)) {
@@ -118,7 +133,8 @@ private LogicalOperator implementInput(DrillImplementor
implementor, int i, int
}
}
- private LogicalOperator rename(DrillImplementor implementor, LogicalOperator
inputOp, List<String> inputFields, List<String> outputFields) {
+ private static LogicalOperator rename(DrillImplementor implementor,
LogicalOperator inputOp,
+ List<String> inputFields, List<String>
outputFields) {
Project.Builder builder = Project.builder();
builder.setInput(inputOp);
for (Pair<String, String> pair : Pair.zip(inputFields, outputFields)) {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnnestRel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnnestRel.java
new file mode 100644
index 0000000000..0bf8c6844d
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnnestRel.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.Unnest;
+import org.apache.drill.exec.planner.common.DrillUnnestRelBase;
+
+
+public class DrillUnnestRel extends DrillUnnestRelBase implements DrillRel {
+
+
+ public DrillUnnestRel(RelOptCluster cluster, RelTraitSet traits,
+ RelDataType rowType, RexNode ref) {
+ super(cluster, traits, ref);
+ this.rowType = rowType;
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ if(getRef() instanceof RexFieldAccess) {
+ final RexFieldAccess fldAccess = (RexFieldAccess)getRef();
+ return new
Unnest(SchemaPath.getSimplePath(fldAccess.getField().getName()));
+ }
+
+ return null;
+ }
+
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnnestRule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnnestRule.java
new file mode 100644
index 0000000000..762eb46f31
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnnestRule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.Uncollect;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalValues;
+
+public class DrillUnnestRule extends RelOptRule {
+ public static final RelOptRule INSTANCE = new DrillUnnestRule();
+
+ private DrillUnnestRule() {
+ super(RelOptHelper.some(Uncollect.class,
+ RelOptHelper.some(LogicalProject.class,
RelOptHelper.any(LogicalValues.class, Convention.NONE))),
+ DrillRelFactories.LOGICAL_BUILDER, "DrillUnnestRule");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final Uncollect uncollect = call.rel(0);
+ final LogicalProject project = call.rel(1);
+ final LogicalValues values = call.rel(2);
+
+ final RelTraitSet traits =
uncollect.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
+ DrillUnnestRel unnest = new DrillUnnestRel(uncollect.getCluster(), traits,
uncollect.getRowType(),
+ project.getProjects().iterator().next());
+ call.transformTo(unnest);
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrel.java
new file mode 100644
index 0000000000..9d308f0135
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrel.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.sql.SemiJoinType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.LateralJoinPOP;
+import org.apache.drill.exec.planner.common.DrillCorrelateRelBase;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.record.BatchSchema;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class CorrelatePrel extends DrillCorrelateRelBase implements Prel {
+
+
+ protected CorrelatePrel(RelOptCluster cluster, RelTraitSet traits, RelNode
left, RelNode right,
+ CorrelationId correlationId, ImmutableBitSet
requiredColumns, SemiJoinType semiJoinType) {
+ super(cluster, traits, left, right, correlationId, requiredColumns,
semiJoinType);
+ }
+ @Override
+ public Correlate copy(RelTraitSet traitSet,
+ RelNode left, RelNode right, CorrelationId
correlationId,
+ ImmutableBitSet requiredColumns, SemiJoinType
joinType) {
+ return new CorrelatePrel(this.getCluster(), this.getTraitSet(), left,
right, correlationId, requiredColumns,
+ this.getJoinType());
+ }
+
+ @Override
+ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator)
throws IOException {
+
+ PhysicalOperator leftPop = ((Prel)left).getPhysicalOperator(creator);
+ PhysicalOperator rightPop = ((Prel)right).getPhysicalOperator(creator);
+
+ SemiJoinType jtype = this.getJoinType();
+
+ LateralJoinPOP ljoin = new LateralJoinPOP(leftPop, rightPop,
jtype.toJoinType());
+ return creator.addMetadata(this, ljoin);
+ }
+
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> visitor, X
value) throws E {
+ return visitor.visitPrel(this, value);
+ }
+
+ @Override
+ public Iterator<Prel> iterator() {
+ return PrelUtil.iter(getLeft(), getRight());
+ }
+
+ @Override
+ public boolean needsFinalColumnReordering() {
+ return true;
+ }
+
+ @Override
+ public BatchSchema.SelectionVectorMode[] getSupportedEncodings() {
+ return BatchSchema.SelectionVectorMode.DEFAULT;
+ }
+
+ @Override
+ public BatchSchema.SelectionVectorMode getEncoding() {
+ return BatchSchema.SelectionVectorMode.NONE;
+ }
+
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrule.java
new file mode 100644
index 0000000000..4f1e1d884e
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrule.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.planner.logical.DrillCorrelateRel;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+
+public class CorrelatePrule extends Prule {
+ public static final RelOptRule INSTANCE = new
CorrelatePrule("Prel.CorrelatePrule",
+ RelOptHelper.any(DrillCorrelateRel.class));
+
+ private CorrelatePrule(String name, RelOptRuleOperand operand) {
+ super(operand, name);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final DrillCorrelateRel correlate = call.rel(0);
+ final RelNode left = correlate.getLeft();
+ final RelNode right = correlate.getRight();
+ RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
+ RelTraitSet traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL);
+
+ RelTraitSet corrTraits =
traitsLeft.plus(DrillDistributionTrait.RANDOM_DISTRIBUTED);
+
+ final RelNode convertedLeft = convert(left, traitsLeft);
+ final RelNode convertedRight = convert(right, traitsRight);
+
+ final CorrelatePrel correlatePrel = new
CorrelatePrel(correlate.getCluster(),
+ corrTraits,
+ convertedLeft, convertedRight,
correlate.getCorrelationId(),
+
correlate.getRequiredColumns(),correlate.getJoinType());
+ call.transformTo(correlatePrel);
+ }
+
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 8b5c878e6b..5a40ae42f3 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -112,6 +112,12 @@
public static final EnumeratedStringValidator QUOTING_IDENTIFIERS = new
EnumeratedStringValidator(
QUOTING_IDENTIFIERS_KEY, Quoting.BACK_TICK.string,
Quoting.DOUBLE_QUOTE.string, Quoting.BRACKET.string);
+ /*
+ "planner.enable_unnest_lateral" is to allow users to choose enable
unnest+lateraljoin feature.
+ */
+ public static final String ENABLE_UNNEST_LATERAL_KEY =
"planner.enable_unnest_lateral";
+ public static final BooleanValidator ENABLE_UNNEST_LATERAL = new
BooleanValidator(ENABLE_UNNEST_LATERAL_KEY);
+
/*
Enables rules that re-write query joins in the most optimal way.
Though its turned on be default and its value in query optimization is
undeniable, user may want turn off such
@@ -317,6 +323,10 @@ public boolean isJoinOptimizationEnabled() {
return options.getOption(JOIN_OPTIMIZATION);
}
+ public boolean isUnnestLateralEnabled() {
+ return options.getOption(ENABLE_UNNEST_LATERAL);
+ }
+
@Override
public <T> T unwrap(Class<T> clazz) {
if(clazz == PlannerSettings.class){
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java
new file mode 100644
index 0000000000..1e8730514e
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.UnnestPOP;
+import org.apache.drill.exec.planner.common.DrillUnnestRelBase;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.record.BatchSchema;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+public class UnnestPrel extends DrillUnnestRelBase implements Prel {
+
+ protected final UnnestPOP unnestPOP;
+
+ public UnnestPrel(RelOptCluster cluster, RelTraitSet traits,
+ RelDataType rowType, RexNode ref) {
+ super(cluster, traits, ref);
+ this.unnestPOP = new UnnestPOP(null,
SchemaPath.getSimplePath(((RexFieldAccess)ref).getField().getName()));
+ this.rowType = rowType;
+ }
+
+ @Override
+ public Iterator<Prel> iterator() {
+ return Collections.emptyIterator();
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> visitor, X
value) throws E {
+ return visitor.visitPrel(this, value);
+ }
+
+ @Override
+ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator)
+ throws IOException {
+ return creator.addMetadata(this, unnestPOP);
+ }
+
+ @Override
+ public BatchSchema.SelectionVectorMode[] getSupportedEncodings() {
+ return BatchSchema.SelectionVectorMode.DEFAULT;
+ }
+
+ @Override
+ public BatchSchema.SelectionVectorMode getEncoding() {
+ return BatchSchema.SelectionVectorMode.NONE;
+ }
+
+ @Override
+ public boolean needsFinalColumnReordering() {
+ return true;
+ }
+
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrule.java
new file mode 100644
index 0000000000..48f4ea964e
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrule.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.logical.DrillUnnestRel;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+
+public class UnnestPrule extends Prule {
+ public static final RelOptRule INSTANCE = new UnnestPrule();
+
+ private UnnestPrule() {
+ super(RelOptHelper.any(DrillUnnestRel.class), "UnnestPrule");
+ }
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final DrillUnnestRel unnest = call.rel(0);
+ RexNode ref = unnest.getRef();
+ if (ref instanceof RexFieldAccess) {
+ final RexFieldAccess field = (RexFieldAccess)ref;
+ field.getField().getName();
+ }
+
+ UnnestPrel unnestPrel = new UnnestPrel(unnest.getCluster(),
+ unnest.getTraitSet().plus(Prel.DRILL_PHYSICAL), unnest.getRowType(),
ref);
+
+ call.transformTo(unnestPrel);
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java
index da7b108f60..6b97841546 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/UnsupportedOperatorsVisitor.java
@@ -266,6 +266,15 @@ public SqlNode visit(SqlCall sqlCall) {
}
}
+ //Disable UNNEST if the configuration disable it
+ if (sqlCall.getKind() == SqlKind.UNNEST) {
+ if (!context.getPlannerSettings().isUnnestLateralEnabled()) {
+
unsupportedOperatorCollector.setException(SqlUnsupportedException.ExceptionType.RELATIONAL,
+ "Unnest is not enabled per configuration");
+ throw new UnsupportedOperationException();
+ }
+ }
+
// Disable Function
for(String strOperator : disabledOperators) {
if(sqlCall.getOperator().isName(strOperator)) {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 6f59eb4be6..7f02773384 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -115,6 +115,7 @@
new
OptionDefinition(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD),
new OptionDefinition(PlannerSettings.QUOTING_IDENTIFIERS),
new OptionDefinition(PlannerSettings.JOIN_OPTIMIZATION),
+ new OptionDefinition(PlannerSettings.ENABLE_UNNEST_LATERAL),
new OptionDefinition(PlannerSettings.FORCE_2PHASE_AGGR), // for testing
new OptionDefinition(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR),
new OptionDefinition(ExecConstants.HASHAGG_MAX_MEMORY_VALIDATOR),
diff --git a/exec/java-exec/src/main/resources/drill-module.conf
b/exec/java-exec/src/main/resources/drill-module.conf
index 8874f92042..b2013d576c 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -486,6 +486,7 @@ drill.exec.options: {
planner.enable_topn: true,
planner.enable_type_inference: true,
planner.enable_unionall_distribute: false,
+ planner.enable_unnest_lateral: false,
planner.filter.max_selectivity_estimate_factor: 1.0,
planner.filter.min_selectivity_estimate_factor: 0.0,
planner.force_2phase_aggr: false,
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
new file mode 100644
index 0000000000..2125bd14e2
--- /dev/null
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.lateraljoin;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.test.BaseTestQuery;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestLateralPlans extends BaseTestQuery {
+
+ @BeforeClass
+ public static void enableUnnestLateral() throws Exception {
+ test("alter session set `planner.enable_unnest_lateral`=true");
+ }
+
+ @Test
+ public void testLateralPlan1() throws Exception {
+ int numOutputRecords =
testPhysical(getFile("lateraljoin/lateralplan1.json"));
+ assertEquals(numOutputRecords, 12);
+ }
+
+ @Test
+ public void testLateralSql() throws Exception {
+ String Sql = "select t.c_name, t2.o_shop as o_shop from
cp.`lateraljoin/nested-customer.json` t,"
+ + " unnest(t.orders) t2 limit 1";
+ testBuilder()
+ .unOrdered()
+ .sqlQuery(Sql)
+ .baselineColumns("c_name", "o_shop")
+ .baselineValues("customer1", "Meno Park 1st")
+ .go();
+ }
+
+ @Test
+ public void testExplainLateralSql() throws Exception {
+ String Sql = "explain plan without implementation for"
+ + " select t.c_name, t2.o_shop as o_shop from
cp.`lateraljoin/nested-customer.json` t,"
+ + " unnest(t.orders) t2 limit 1";
+ test(Sql);
+ }
+
+ @Test
+ public void testFilterPushCorrelate() throws Exception {
+ test("alter session set `planner.slice_target`=1");
+ String query = "select t.c_name, t2.o_shop as o_shop from
cp.`lateraljoin/nested-customer.json` t,"
+ + " unnest(t.orders) t2 where t.c_name='customer1' AND t2.o_shop='Meno
Park 1st' ";
+ PlanTestBase.testPlanMatchingPatterns(query,
+ new String[]
{"Correlate(.*[\n\r])+.*Filter(.*[\n\r])+.*Scan(.*[\n\r])+.*Filter"},
+ new String[]{}
+ );
+ testBuilder()
+ .unOrdered()
+ .sqlQuery(query)
+ .baselineColumns("c_name", "o_shop")
+ .baselineValues("customer1", "Meno Park 1st")
+ .go();
+ }
+
+ @Test
+ @Ignore("naming of single column")
+ public void testLateralSqlPlainCol() throws Exception {
+ String Sql = "select t.c_name, t2.c_phone from
cp.`lateraljoin/nested-customer.json` t, unnest(t.c_phone) t2 limit 1";
+ testBuilder()
+ .unOrdered()
+ .sqlQuery(Sql)
+ .baselineColumns("c_name", "c_phone_flat")
+ .baselineValues("customer1", "6505200001")
+ .go();
+
+ }
+
+ @Test
+ public void testLateralSqlStar() throws Exception {
+ String Sql = "select * from cp.`lateraljoin/nested-customer.json` t,
unnest(t.orders) t2 limit 1";
+ test(Sql);
+ }
+
+ @Test
+ @Ignore("To be fixed: how to specify columns names for table alias in
dynamic case")
+ public void testLateralSqlWithAS() throws Exception {
+ String Sql = "select t.c_name, t2.o_shop from
cp.`lateraljoin/nested-customer.parquet` t,"
+ + " unnest(t.orders) as t2(o_shop) limit 1";
+ testBuilder()
+ .unOrdered()
+ .sqlQuery(Sql)
+ .baselineColumns("c_name", "o_shop")
+ .baselineValues("customer1", "Meno Park 1st")
+ .go();
+
+ }
+
+ @Test
+ public void testSubQuerySql() throws Exception {
+ String Sql = "select t2.os.* from (select t.orders as os from
cp.`lateraljoin/nested-customer.parquet` t) t2";
+ test(Sql);
+ }
+}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
index 137966ba33..8ec0c96fa8 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
@@ -42,6 +42,7 @@
import org.apache.drill.test.rowSet.schema.SchemaBuilder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -132,6 +133,7 @@ public void testUnnestVarWidthColumn() {
}
@Test
+ @Ignore("With DRILL-6321 commits, Unnest's output could be multiplec olumns")
public void testUnnestMapColumn() {
Object[][] data = getMapData();
diff --git a/exec/java-exec/src/test/resources/lateraljoin/lateralplan1.json
b/exec/java-exec/src/test/resources/lateraljoin/lateralplan1.json
new file mode 100644
index 0000000000..9be5878a46
--- /dev/null
+++ b/exec/java-exec/src/test/resources/lateraljoin/lateralplan1.json
@@ -0,0 +1,95 @@
+{
+ "head" : {
+ "version" : 1,
+ "generator" : {
+ "type" : "DefaultSqlHandler",
+ "info" : ""
+ },
+ "type" : "APACHE_DRILL_PHYSICAL",
+ "options" : [ {
+ "kind" : "DOUBLE",
+ "accessibleScopes" : "ALL",
+ "name" : "planner.index.noncovering_selectivity_threshold",
+ "float_val" : 0.25,
+ "scope" : "SESSION"
+ } ],
+ "queue" : 0,
+ "hasResourcePlan" : false,
+ "resultMode" : "EXEC"
+ },
+ "graph" : [ {
+ "pop" : "parquet-scan",
+ "@id" : 5,
+ "userName" : "root",
+ "entries" : [ {
+ "path" : "lateraljoin/nested-customer.parquet"
+ } ],
+ "storage" : {
+ "type" : "file",
+ "enabled" : true,
+ "connection" : "classpath:///",
+ "formats" : {
+ "json" : {
+ "type" : "json"
+ },
+ "parquet" : {
+ "type" : "parquet"
+ }
+ }
+ },
+ "format" : {
+ "type" : "parquet"
+ },
+ "cost" : 1000.0
+ }, {
+ "pop" : "unnest",
+ "@id" : 7,
+ "userName" : "root",
+ "column" : "`orders`",
+ "cost" : 1000.0
+ }, {
+ "pop" : "project",
+ "@id" : 6,
+ "exprs" : [ {
+ "ref" : "`ITEM`",
+ "expr" : "`o_shop`"
+ }],
+ "child" : 7,
+ "outputProj" : false,
+ "initialAllocation" : 1000000,
+ "maxAllocation" : 10000000000,
+ "cost" : 1000.0
+ }, {
+ "pop" : "lateral-join",
+ "@id" : 4,
+ "left" : 5,
+ "right" : 6,
+ "unnestForLateralJoin": 7,
+ "initialAllocation" : 1000000,
+ "maxAllocation" : 10000000000,
+ "cost" : 1000.0
+ }, {
+ "pop" : "project",
+ "@id" : 3,
+ "exprs" : [ {
+ "ref" : "`ITEM`",
+ "expr" : "`c_name`"
+ }, {
+ "ref" : "`ITEM1`",
+ "expr" : "`o_shop`"
+ } ],
+ "child" : 4,
+ "outputProj" : false,
+ "initialAllocation" : 1000000,
+ "maxAllocation" : 10000000000,
+ "cost" : 1000.0
+ }, {
+ "pop" : "screen",
+ "@id" : 0,
+ "child" : 3,
+ "initialAllocation" : 1000000,
+ "maxAllocation" : 10000000000,
+ "cost" : 1000.0
+ } ]
+}
+
diff --git a/exec/java-exec/src/test/resources/lateraljoin/nested-customer.json
b/exec/java-exec/src/test/resources/lateraljoin/nested-customer.json
new file mode 100644
index 0000000000..710ca420a2
--- /dev/null
+++ b/exec/java-exec/src/test/resources/lateraljoin/nested-customer.json
@@ -0,0 +1,114 @@
+{
+ "c_name" : "customer1",
+ "c_id" : 1,
+ "c_phone" : ["6505200001", "4085201234", "6125205678"],
+ "orders" : [{"o_id": 1, "o_shop": "Meno Park 1st", "o_amount": 4.5,
+ "items" : [ {"i_name" : "paper towel", "i_number": 2,
"i_supplier": "oregan"},
+ {"i_name" : "map", "i_number": 1, "i_supplier":
"washington"},
+ {"i_name" : "cheese", "i_number": 9, "i_supplier":
"california"}
+ ]
+
+ },
+ {"o_id": 2, "o_shop": "Mountain View 1st", "o_amount": 104.5,
+ "items" : [ {"i_name" : "beef", "i_number": 3, "i_supplier":
"montana"},
+ {"i_name" : "tooth paste", "i_number": 4,
"i_supplier": "washington"},
+ {"i_name" : "hat", "i_number": 7, "i_supplier":
"california"}
+ ]
+
+ },
+ {"o_id": 3, "o_shop": "Sunnyvale 1st", "o_amount": 294.5,
+ "items" : [ {"i_name" : "paper towel", "i_number": 5,
"i_supplier": "oregan"},
+ {"i_name" : "tooth paste", "i_number": 6,
"i_supplier": "washington"},
+ {"i_name" : "cheese", "i_number": 8, "i_supplier":
"california"}
+ ]
+ }
+ ],
+ "c_address" : "bay area, CA"
+}
+{
+ "c_name" : "customer2",
+ "c_id" : 2,
+ "c_phone" : ["1505200001", "7085201234", "2125205678"],
+ "orders" : [{"o_id": 10, "o_shop": "Mountain View 1st", "o_amount": 724.5,
+ "items" : [ {"i_name" : "beef", "i_number": 12, "i_supplier":
"montana"},
+ {"i_name" : "tooth paste", "i_number": 11,
"i_supplier": "washington"},
+ {"i_name" : "hat", "i_number": 10, "i_supplier":
"california"}
+ ]
+
+ },
+
+ {"o_id": 11, "o_shop": "Sunnyvale 1st", "o_amount": 179.5,
+ "items" : [ {"i_name" : "paper towel", "i_number": 13,
"i_supplier": "oregan"},
+ {"i_name" : "tooth paste", "i_number": 14,
"i_supplier": "washington"},
+ {"i_name" : "cheese", "i_number": 15, "i_supplier":
"california"}
+ ]
+ },
+ {"o_id": 12, "o_shop": "Meno Park 1st", "o_amount": 80.0,
+ "items" : [ {"i_name" : "paper towel", "i_number": 13,
"i_supplier": "oregan"},
+ {"i_name" : "tooth paste", "i_number": 14,
"i_supplier": "washington"},
+ {"i_name" : "cheese", "i_number": 15, "i_supplier":
"california"}
+ ]
+ }
+ ],
+ "c_address" : "LA, CA"
+}
+{
+ "c_name" : "customer3",
+ "c_id" : 3,
+ "c_phone" : ["1205200001", "7285201234", "2325205678"],
+ "orders" : [{"o_id": 21, "o_shop": "Meno Park 1st", "o_amount": 192.5,
+ "items" : [ {"i_name" : "beef", "i_number": 22, "i_supplier":
"montana"},
+ {"i_name" : "tooth paste", "i_number": 21,
"i_supplier": "washington"},
+ {"i_name" : "hat", "i_number": 20, "i_supplier":
"california"}
+ ]
+
+ },
+
+ {"o_id": 22, "o_shop": "Mountain View 1st", "o_amount": 680.9,
+ "items" : [ {"i_name" : "paper towel", "i_number": 23,
"i_supplier": "oregan"},
+ {"i_name" : "tooth paste", "i_number": 24,
"i_supplier": "washington"},
+ {"i_name" : "cheese", "i_number": 25, "i_supplier":
"california"}
+ ]
+ },
+
+ {"o_id": 23, "o_shop": "Sunnyvale 1st", "o_amount": 772.2,
+ "items" : [ {"i_name" : "paper towel", "i_number": 26,
"i_supplier": "oregan"},
+ {"i_name" : "tooth paste", "i_number": 27,
"i_supplier": "washington"},
+ {"i_name" : "cheese", "i_number": 28, "i_supplier":
"california"}
+ ]
+ }
+
+ ],
+ "c_address" : "bay area, CA"
+}
+{
+ "c_name" : "customer4",
+ "c_id" : 4,
+ "c_phone" : ["6509200001", "4088201234", "6127205678"],
+ "orders" : [{"o_id": 30, "o_shop": "Mountain View 1st", "o_amount": 870.2,
+ "items" : [ {"i_name" : "beef", "i_number": 32, "i_supplier":
"montana"},
+ {"i_name" : "tooth paste", "i_number": 31,
"i_supplier": "washington"},
+ {"i_name" : "hat", "i_number": 30, "i_supplier":
"california"}
+ ]
+
+ },
+
+ {"o_id": 31, "o_shop": "Sunnyvale 1st", "o_amount": 970.5,
+ "items" : [ {"i_name" : "beef", "i_number": 32, "i_supplier":
"montana"},
+ {"i_name" : "tooth paste", "i_number": 31,
"i_supplier": "washington"},
+ {"i_name" : "cheese", "i_number": 30, "i_supplier":
"california"}
+ ]
+
+ },
+
+ {"o_id": 32, "o_shop": "Meno Park 1st", "o_amount": 1030.1,
+ "items" : [ {"i_name" : "paper towel", "i_number": 36,
"i_supplier": "oregan"},
+ {"i_name" : "tooth paste", "i_number": 37,
"i_supplier": "washington"},
+ {"i_name" : "cheese", "i_number": 38, "i_supplier":
"california"}
+ ]
+ }
+
+ ],
+ "c_address" : "LA, CA"
+}
+
diff --git
a/exec/java-exec/src/test/resources/lateraljoin/nested-customer.parquet
b/exec/java-exec/src/test/resources/lateraljoin/nested-customer.parquet
new file mode 100644
index 0000000000..97d898a719
Binary files /dev/null and
b/exec/java-exec/src/test/resources/lateraljoin/nested-customer.parquet differ
diff --git
a/logical/src/main/java/org/apache/drill/common/logical/data/LateralJoin.java
b/logical/src/main/java/org/apache/drill/common/logical/data/LateralJoin.java
new file mode 100644
index 0000000000..55334221e7
--- /dev/null
+++
b/logical/src/main/java/org/apache/drill/common/logical/data/LateralJoin.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.logical.data;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.Iterators;
+import org.apache.drill.common.logical.data.visitors.LogicalVisitor;
+
+import java.util.Iterator;
+
+@JsonTypeName("lateral-join")
+public class LateralJoin extends LogicalOperatorBase {
+ private final LogicalOperator left;
+ private final LogicalOperator right;
+
+ @JsonCreator
+ public LateralJoin(@JsonProperty("left") LogicalOperator left,
@JsonProperty("right") LogicalOperator right) {
+ super();
+ this.left = left;
+ this.right = right;
+ left.registerAsSubscriber(this);
+ right.registerAsSubscriber(this);
+ }
+
+ public LogicalOperator getLeft() {
+ return left;
+ }
+
+ public LogicalOperator getRight() {
+ return right;
+ }
+
+ @Override
+ public Iterator<LogicalOperator> iterator() {
+ return Iterators.forArray(getLeft(), getRight());
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E>
logicalVisitor, X value) throws E {
+ return logicalVisitor.visitLateralJoin(this, value);
+ }
+}
diff --git
a/logical/src/main/java/org/apache/drill/common/logical/data/Unnest.java
b/logical/src/main/java/org/apache/drill/common/logical/data/Unnest.java
new file mode 100644
index 0000000000..9f9e964956
--- /dev/null
+++ b/logical/src/main/java/org/apache/drill/common/logical/data/Unnest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.logical.data;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.visitors.LogicalVisitor;
+
+@JsonTypeName("unnest")
+public class Unnest extends SourceOperator {
+
+ private final SchemaPath column;
+
+ @JsonCreator
+ public Unnest(@JsonProperty("column") SchemaPath column) {
+ this.column = column;
+ }
+
+ public SchemaPath getColumn() {
+ return column;
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E>
logicalVisitor, X value) throws E {
+ return logicalVisitor.visitUnnest(this, value);
+ }
+}
diff --git
a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
index a04c3a4bc8..4fd64c5206 100644
---
a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
+++
b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.common.logical.data.visitors;
+import org.apache.drill.common.logical.data.LateralJoin;
+import org.apache.drill.common.logical.data.Unnest;
import org.apache.drill.common.logical.data.Values;
import org.apache.drill.common.logical.data.Filter;
import org.apache.drill.common.logical.data.Flatten;
@@ -123,4 +125,14 @@ public T visitValues(Values constant, X value) throws E {
public T visitWriter(Writer writer, X value) throws E {
return visitOp(writer, value);
}
+
+ @Override
+ public T visitUnnest(Unnest unnest, X value) throws E {
+ return visitOp(unnest, value);
+ }
+
+ @Override
+ public T visitLateralJoin(LateralJoin lateralJoin, X value) throws E {
+ return visitOp(lateralJoin, value);
+ }
}
diff --git
a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
index 5b5ca5ff90..55fa83880f 100644
---
a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
+++
b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java
@@ -18,6 +18,8 @@
package org.apache.drill.common.logical.data.visitors;
+import org.apache.drill.common.logical.data.LateralJoin;
+import org.apache.drill.common.logical.data.Unnest;
import org.apache.drill.common.logical.data.Values;
import org.apache.drill.common.logical.data.Filter;
import org.apache.drill.common.logical.data.Flatten;
@@ -63,4 +65,7 @@
public RETURN visitUnion(Union union, EXTRA value) throws EXCEP;
public RETURN visitWindow(Window window, EXTRA value) throws EXCEP;
public RETURN visitWriter(Writer writer, EXTRA value) throws EXCEP;
+
+ public RETURN visitUnnest(Unnest unnest, EXTRA value) throws EXCEP;
+ public RETURN visitLateralJoin(LateralJoin lateralJoin, EXTRA value)
throws EXCEP;
}
diff --git a/pom.xml b/pom.xml
index 151f2084c0..43207c104d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,7 +45,7 @@
<dep.guava.version>18.0</dep.guava.version>
<forkCount>2</forkCount>
<parquet.version>1.8.1-drill-r0</parquet.version>
- <calcite.version>1.16.0-drill-r0</calcite.version>
+ <calcite.version>1.16.0-drill-r1</calcite.version>
<avatica.version>1.11.0</avatica.version>
<janino.version>2.7.6</janino.version>
<sqlline.version>1.1.9-drill-r7</sqlline.version>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Lateral Join: Planning changes - enable submitting physical plan
> ----------------------------------------------------------------
>
> Key: DRILL-6321
> URL: https://issues.apache.org/jira/browse/DRILL-6321
> Project: Apache Drill
> Issue Type: Task
> Reporter: Parth Chandra
> Assignee: Chunhui Shi
> Priority: Major
> Labels: ready-to-commit
> Fix For: 1.14.0
>
>
> Implement changes to enable submitting a physical plan containing lateral and
> unnest.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)