This is an automated email from the ASF dual-hosted git repository. parthc pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 0a4a37e11f2f2ad66de80918a4b34665e42803fc Author: chunhui-shi <[email protected]> AuthorDate: Mon Mar 26 12:15:09 2018 -0700 DRILL-6321: Lateral Join and Unnest - initial implementation for parser and planning --- .../org/apache/drill/exec/opt/BasicOptimizer.java | 6 ++ .../physical/base/AbstractPhysicalVisitor.java | 6 ++ .../drill/exec/physical/base/PhysicalVisitor.java | 2 + .../drill/exec/physical/config/LateralJoinPOP.java | 22 +++- .../drill/exec/physical/config/UnnestPOP.java | 32 ++++-- .../impl/join/LateralJoinBatchCreator.java | 9 +- .../physical/impl/unnest/UnnestRecordBatch.java | 20 +++- .../apache/drill/exec/planner/PlannerPhase.java | 10 +- .../exec/planner/common/DrillCorrelateRelBase.java | 55 ++++++++++ .../exec/planner/common/DrillUnnestRelBase.java | 54 ++++++++++ .../drill/exec/planner/fragment/Materializer.java | 38 +++++++ .../exec/planner/logical/DrillCorrelateRel.java | 51 +++++++++ .../exec/planner/logical/DrillCorrelateRule.java | 53 ++++++++++ .../logical/DrillUnnestRel.java} | 32 ++++-- .../exec/planner/logical/DrillUnnestRule.java | 48 +++++++++ .../drill/exec/planner/physical/CorrelatePrel.java | 89 ++++++++++++++++ .../exec/planner/physical/CorrelatePrule.java | 56 ++++++++++ .../drill/exec/planner/physical/UnnestPrel.java | 78 ++++++++++++++ .../drill/exec/planner/physical/UnnestPrule.java | 49 +++++++++ .../impl/lateraljoin/TestLateralPhysicalPlan.java | 88 ++++++++++++++++ .../test/resources/lateraljoin/lateralplan1.json | 95 +++++++++++++++++ .../resources/lateraljoin/nested-customer.json | 114 +++++++++++++++++++++ .../resources/lateraljoin/nested-customer.parquet | Bin 0 -> 3360 bytes .../drill/common/logical/data/LateralJoin.java | 45 ++++---- .../apache/drill/common/logical/data/Unnest.java | 33 ++++-- .../data/visitors/AbstractLogicalVisitor.java | 12 +++ .../logical/data/visitors/LogicalVisitor.java | 5 + 27 files changed, 1046 insertions(+), 56 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java index 9b04e94..36e74a0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java @@ -37,6 +37,7 @@ import org.apache.drill.common.logical.data.Project; import org.apache.drill.common.logical.data.Scan; import org.apache.drill.common.logical.data.SinkOperator; import org.apache.drill.common.logical.data.Store; +import org.apache.drill.common.logical.data.Unnest; import org.apache.drill.common.logical.data.Window; import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor; import org.apache.drill.common.types.TypeProtos; @@ -238,5 +239,10 @@ public class BasicOptimizer extends Optimizer { final PhysicalOperator child = filter.getInput().accept(this, obj); return new SelectionVectorRemover(new org.apache.drill.exec.physical.config.Filter(child, filter.getExpr(), 1.0f)); } + + @Override + public PhysicalOperator visitUnnest(final Unnest unnest, final Object obj) throws OptimizerException { + return new org.apache.drill.exec.physical.config.UnnestPOP(null, unnest.getColumn()); + } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java index 86a31c1..340c303 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java @@ -23,6 +23,7 @@ import org.apache.drill.exec.physical.config.FlattenPOP; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.config.IteratorValidator; +import org.apache.drill.exec.physical.config.LateralJoinPOP; import org.apache.drill.exec.physical.config.Limit; import org.apache.drill.exec.physical.config.MergingReceiverPOP; import org.apache.drill.exec.physical.config.OrderedPartitionSender; @@ -181,6 +182,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme } @Override + public T visitLateralJoin(LateralJoinPOP lateralJoinPOP, X value) throws E { + return visitOp(lateralJoinPOP, value); + } + + @Override public T visitIteratorValidator(IteratorValidator op, X value) throws E { return visitOp(op, value); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java index 5c926ad..f2e53eb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java @@ -23,6 +23,7 @@ import org.apache.drill.exec.physical.config.FlattenPOP; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.config.IteratorValidator; +import org.apache.drill.exec.physical.config.LateralJoinPOP; import org.apache.drill.exec.physical.config.Limit; import org.apache.drill.exec.physical.config.MergingReceiverPOP; import org.apache.drill.exec.physical.config.OrderedPartitionSender; @@ -78,6 +79,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitWindowFrame(WindowPOP op, EXTRA value) throws EXCEP; public RETURN visitProducerConsumer(ProducerConsumer op, EXTRA value) throws EXCEP; public RETURN visitUnnest(UnnestPOP unnest, EXTRA value) throws EXCEP; + public RETURN visitLateralJoin(LateralJoinPOP lateralJoinPOP, EXTRA value) throws EXCEP; public RETURN visitIteratorValidator(IteratorValidator op, EXTRA value) throws EXCEP; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java index 946b4a6..fab89a2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import org.apache.calcite.rel.core.JoinRelType; import org.apache.drill.exec.physical.base.AbstractJoinPop; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import java.util.List; @@ -33,6 +34,9 @@ import java.util.List; public class LateralJoinPOP extends AbstractJoinPop { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinPOP.class); + @JsonProperty("unnestForLateralJoin") + private UnnestPOP unnestForLateralJoin; + @JsonCreator public LateralJoinPOP( @JsonProperty("left") PhysicalOperator left, @@ -45,11 +49,27 @@ public class LateralJoinPOP extends AbstractJoinPop { public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { Preconditions.checkArgument(children.size() == 2, "Lateral join should have two physical operators"); - return new LateralJoinPOP(children.get(0), children.get(1), joinType); + LateralJoinPOP newPOP = new LateralJoinPOP(children.get(0), children.get(1), joinType); + newPOP.unnestForLateralJoin = this.unnestForLateralJoin; + return newPOP; + } + + @JsonProperty("unnestForLateralJoin") + public UnnestPOP getUnnestForLateralJoin() { + return this.unnestForLateralJoin; + } + + public void setUnnestForLateralJoin(UnnestPOP unnest) { + this.unnestForLateralJoin = unnest; } @Override public int getOperatorType() { return CoreOperatorType.LATERAL_JOIN_VALUE; } + + @Override + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { + return physicalVisitor.visitLateralJoin(this, value); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnnestPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnnestPOP.java index 6e35134..37ba495 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnnestPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnnestPOP.java @@ -18,36 +18,49 @@ package org.apache.drill.exec.physical.config; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.collect.Iterators; +import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.physical.base.AbstractSingle; +import org.apache.drill.exec.physical.base.AbstractBase; +import org.apache.drill.exec.physical.base.Leaf; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.physical.impl.unnest.UnnestRecordBatch; import java.util.Iterator; import static org.apache.drill.exec.proto.UserBitShared.CoreOperatorType.UNNEST_VALUE; @JsonTypeName("unnest") -public class UnnestPOP extends AbstractSingle { +public class UnnestPOP extends AbstractBase implements Leaf { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnnestPOP.class); private SchemaPath column; + @JsonIgnore + private UnnestRecordBatch unnestBatch; + @JsonCreator public UnnestPOP( @JsonProperty("child") PhysicalOperator child, // Operator with incoming record batch @JsonProperty("column") SchemaPath column) { - super(child); this.column = column; } + @Override + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { + assert children.isEmpty(); + UnnestPOP newUnnest = new UnnestPOP(null, column); + newUnnest.addUnnestBatch(this.unnestBatch); + return newUnnest; + } @Override public Iterator<PhysicalOperator> iterator() { - return Iterators.singletonIterator(child); + return Iterators.emptyIterator(); } public SchemaPath getColumn() { @@ -59,10 +72,13 @@ public class UnnestPOP extends AbstractSingle { return physicalVisitor.visitUnnest(this, value); } - @Override - public PhysicalOperator getNewWithChild(PhysicalOperator child) { - UnnestPOP unnest = new UnnestPOP(child, column); - return unnest; + public void addUnnestBatch(UnnestRecordBatch unnestBatch) { + this.unnestBatch = unnestBatch; + } + + @JsonIgnore + public UnnestRecordBatch getUnnestBatch() { + return this.unnestBatch; } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java index 6ed593d..f868596 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java @@ -19,7 +19,9 @@ package org.apache.drill.exec.physical.impl.join; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.ExecutorFragmentContext; +import org.apache.drill.exec.physical.base.LateralContract; import org.apache.drill.exec.physical.config.LateralJoinPOP; +import org.apache.drill.exec.physical.config.UnnestPOP; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; @@ -29,6 +31,11 @@ public class LateralJoinBatchCreator implements BatchCreator<LateralJoinPOP> { @Override public LateralJoinBatch getBatch(ExecutorFragmentContext context, LateralJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException { - return new LateralJoinBatch(config, context, children.get(0), children.get(1)); + LateralJoinBatch ljBatch = new LateralJoinBatch(config, context, children.get(0), children.get(1)); + UnnestPOP unnest = config.getUnnestForLateralJoin(); + if (unnest != null) { + unnest.getUnnestBatch().setIncoming((LateralContract)ljBatch); + } + return ljBatch; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java index d366c80..3ef8179 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.physical.impl.unnest; import com.google.common.base.Preconditions; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.FieldReference; @@ -28,6 +29,9 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.physical.config.UnnestPOP; +import org.apache.drill.exec.physical.impl.join.LateralJoinBatch; +import org.apache.drill.exec.record.AbstractRecordBatch; +import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.AbstractTableFunctionRecordBatch; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.MaterializedField; @@ -37,9 +41,11 @@ import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.MapVector; import org.apache.drill.exec.vector.complex.RepeatedMapVector; import org.apache.drill.exec.vector.complex.RepeatedValueVector; +import java.util.Iterator; import java.util.List; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK; @@ -81,6 +87,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO } } + /** * Memory manager for Unnest. Estimates the batch size exactly like we do for Flatten. */ @@ -134,6 +141,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO public UnnestRecordBatch(UnnestPOP pop, FragmentContext context) throws OutOfMemoryException { super(pop, context); + pop.addUnnestBatch(this); // get the output batch size from config. int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); memoryManager = new UnnestMemoryManager(configuredBatchSize); @@ -166,7 +174,6 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO hasRemainder = false; // whatever the case, we need to stop processing the current row. } - @Override public IterOutcome innerNext() { @@ -261,7 +268,6 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO unnest.setUnnestField(vector); } - @Override protected IterOutcome doWork() { Preconditions.checkNotNull(lateral); memoryManager.update(); @@ -355,7 +361,15 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO final ValueVector unnestVector = transferPair.getTo(); transfers.add(transferPair); - container.add(unnestVector); + if (unnestVector instanceof MapVector) { + Iterator<ValueVector> it = unnestVector.iterator(); + while (it.hasNext()) { + container.add(it.next()); + } + } + else { + container.add(unnestVector); + } logger.debug("Added transfer for unnest expression."); container.buildSchema(SelectionVectorMode.NONE); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java index 17fedc4..3196bd0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java @@ -28,6 +28,7 @@ import org.apache.calcite.tools.RuleSet; import org.apache.calcite.tools.RuleSets; import org.apache.drill.exec.ops.OptimizerRulesContext; import org.apache.drill.exec.planner.logical.DrillAggregateRule; +import org.apache.drill.exec.planner.logical.DrillCorrelateRule; import org.apache.drill.exec.planner.logical.DrillFilterAggregateTransposeRule; import org.apache.drill.exec.planner.logical.DrillFilterItemStarReWriterRule; import org.apache.drill.exec.planner.logical.DrillFilterJoinRules; @@ -48,11 +49,13 @@ import org.apache.drill.exec.planner.logical.DrillRelFactories; import org.apache.drill.exec.planner.logical.DrillScanRule; import org.apache.drill.exec.planner.logical.DrillSortRule; import org.apache.drill.exec.planner.logical.DrillUnionAllRule; +import org.apache.drill.exec.planner.logical.DrillUnnestRule; import org.apache.drill.exec.planner.logical.DrillValuesRule; import org.apache.drill.exec.planner.logical.DrillWindowRule; import org.apache.drill.exec.planner.logical.partition.ParquetPruneScanRule; import org.apache.drill.exec.planner.logical.partition.PruneScanRule; import org.apache.drill.exec.planner.physical.ConvertCountToDirectScan; +import org.apache.drill.exec.planner.physical.CorrelatePrule; import org.apache.drill.exec.planner.physical.DirectScanPrule; import org.apache.drill.exec.planner.physical.FilterPrule; import org.apache.drill.exec.planner.physical.HashAggPrule; @@ -70,6 +73,7 @@ import org.apache.drill.exec.planner.physical.SortConvertPrule; import org.apache.drill.exec.planner.physical.SortPrule; import org.apache.drill.exec.planner.physical.StreamAggPrule; import org.apache.drill.exec.planner.physical.UnionAllPrule; +import org.apache.drill.exec.planner.physical.UnnestPrule; import org.apache.drill.exec.planner.physical.ValuesPrule; import org.apache.drill.exec.planner.physical.WindowPrule; import org.apache.drill.exec.planner.physical.WriterPrule; @@ -307,7 +311,9 @@ public enum PlannerPhase { DrillSortRule.INSTANCE, DrillJoinRule.INSTANCE, DrillUnionAllRule.INSTANCE, - DrillValuesRule.INSTANCE + DrillValuesRule.INSTANCE, + DrillUnnestRule.INSTANCE, + DrillCorrelateRule.INSTANCE ).build(); /** @@ -442,6 +448,8 @@ public enum PlannerPhase { ruleList.add(UnionAllPrule.INSTANCE); ruleList.add(ValuesPrule.INSTANCE); ruleList.add(DirectScanPrule.INSTANCE); + ruleList.add(UnnestPrule.INSTANCE); + ruleList.add(CorrelatePrule.INSTANCE); ruleList.add(DrillPushLimitToScanRule.LIMIT_ON_PROJECT); ruleList.add(DrillPushLimitToScanRule.LIMIT_ON_SCAN); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillCorrelateRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillCorrelateRelBase.java new file mode 100644 index 0000000..ea994ba --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillCorrelateRelBase.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.common; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Correlate; +import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.sql.SemiJoinType; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.planner.cost.DrillCostBase; +import org.apache.drill.exec.planner.physical.PrelUtil; + + +public abstract class DrillCorrelateRelBase extends Correlate implements DrillRelNode { + public DrillCorrelateRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, + CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) { + super(cluster, traits, left, right, correlationId, requiredColumns, semiJoinType); + } + + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, + RelMetadataQuery mq) { + DrillCostBase.DrillCostFactory costFactory = (DrillCostBase.DrillCostFactory) planner.getCostFactory(); + + double rowCount = mq.getRowCount(this.getLeft()); + long fieldWidth = PrelUtil.getPlannerSettings(planner).getOptions() + .getOption(ExecConstants.AVERAGE_FIELD_WIDTH_KEY).num_val; + + double rowSize = (this.getLeft().getRowType().getFieldList().size()) * fieldWidth; + + double cpuCost = rowCount * rowSize * DrillCostBase.BASE_CPU_COST; + double memCost = 0; + return costFactory.makeCost(rowCount, cpuCost, 0, 0, memCost); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnnestRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnnestRelBase.java new file mode 100644 index 0000000..04bb2d6 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnnestRelBase.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.common; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.AbstractRelNode; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexNode; +import org.apache.drill.exec.planner.cost.DrillCostBase; + +public class DrillUnnestRelBase extends AbstractRelNode implements DrillRelNode { + + final protected RexNode ref; + + public DrillUnnestRelBase(RelOptCluster cluster, RelTraitSet traitSet, RexNode ref) { + super(cluster, traitSet); + this.ref = ref; + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + + double rowCount = mq.getRowCount(this); + // Attribute small cost for projecting simple fields. In reality projecting simple columns in not free and + // this allows projection pushdown/project-merge rules to kick-in thereby eliminating unneeded columns from + // the projection. + double cpuCost = DrillCostBase.BASE_CPU_COST * rowCount * this.getRowType().getFieldCount(); + + DrillCostBase.DrillCostFactory costFactory = (DrillCostBase.DrillCostFactory) planner.getCostFactory(); + return costFactory.makeCost(rowCount, cpuCost, 0, 0); + } + + public RexNode getRef() { + return this.ref; + } +} \ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java index 2fc7541..987e65c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java @@ -30,6 +30,8 @@ import org.apache.drill.exec.physical.base.Store; import org.apache.drill.exec.physical.base.SubScan; import com.google.common.collect.Lists; +import org.apache.drill.exec.physical.config.LateralJoinPOP; +import org.apache.drill.exec.physical.config.UnnestPOP; public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Materializer.IndexedFragmentNode, ExecutionSetupException>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Materializer.class); @@ -106,10 +108,38 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate return newOp; } + + @Override + public PhysicalOperator visitLateralJoin(LateralJoinPOP op, IndexedFragmentNode iNode) throws ExecutionSetupException { + iNode.addAllocation(op); + List<PhysicalOperator> children = Lists.newArrayList(); + + children.add(op.getLeft().accept(this, iNode)); + children.add(op.getRight().accept(this, iNode)); + UnnestPOP unnestInLeftInput = iNode.getUnnest(); + + PhysicalOperator newOp = op.getNewWithChildren(children); + newOp.setCost(op.getCost()); + newOp.setOperatorId(Short.MAX_VALUE & op.getOperatorId()); + + ((LateralJoinPOP)newOp).setUnnestForLateralJoin(unnestInLeftInput); + + return newOp; + } + + @Override + public PhysicalOperator visitUnnest(UnnestPOP unnest, IndexedFragmentNode value) throws ExecutionSetupException { + PhysicalOperator newOp = visitOp(unnest, value); + value.addUnnest((UnnestPOP)newOp); + return newOp; + } + public static class IndexedFragmentNode{ final Wrapper info; final int minorFragmentId; + UnnestPOP unnest = null; + public IndexedFragmentNode(int minorFragmentId, Wrapper info) { super(); this.info = info; @@ -132,6 +162,14 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate info.addAllocation(pop); } + public void addUnnest(UnnestPOP unnest) { + this.unnest = unnest; + } + + public UnnestPOP getUnnest() { + return this.unnest; + } + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRel.java new file mode 100644 index 0000000..e5eee4e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRel.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Correlate; +import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.sql.SemiJoinType; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.drill.common.logical.data.LogicalOperator; +import org.apache.drill.exec.planner.common.DrillCorrelateRelBase; + + +public class DrillCorrelateRel extends DrillCorrelateRelBase implements DrillRel { + + protected DrillCorrelateRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, + CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) { + super(cluster, traits, left, right, correlationId, requiredColumns, semiJoinType); + } + + @Override + public Correlate copy(RelTraitSet traitSet, + RelNode left, RelNode right, CorrelationId correlationId, + ImmutableBitSet requiredColumns, SemiJoinType joinType) { + return new DrillCorrelateRel(this.getCluster(), this.getTraitSet(), left, right, correlationId, requiredColumns, + this.getJoinType()); + } + + @Override + public LogicalOperator implement(DrillImplementor implementor) { + //TODO: implementation for direct convert from RelNode to logical operator for explainPlan + return null; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java new file mode 100644 index 0000000..8ac4fb1 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalCorrelate; +import org.apache.calcite.util.trace.CalciteTrace; +import org.slf4j.Logger; + +public class DrillCorrelateRule extends RelOptRule { + public static final RelOptRule INSTANCE = new DrillCorrelateRule(); + protected static final Logger tracer = CalciteTrace.getPlannerTracer(); + + private DrillCorrelateRule() { + super(RelOptHelper.any(LogicalCorrelate.class, Convention.NONE), + DrillRelFactories.LOGICAL_BUILDER, + "DrillCorrelateRule"); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final LogicalCorrelate correlate = call.rel(0); + final RelNode left = correlate.getLeft(); + final RelNode right = correlate.getRight(); + final RelNode convertedLeft = convert(left, left.getTraitSet().plus(DrillRel.DRILL_LOGICAL).simplify()); + final RelNode convertedRight = convert(right, right.getTraitSet().plus(DrillRel.DRILL_LOGICAL).simplify()); + + final RelTraitSet traits = correlate.getTraitSet().plus(DrillRel.DRILL_LOGICAL); + DrillCorrelateRel correlateRel = new DrillCorrelateRel(correlate.getCluster(), + traits, convertedLeft, convertedRight, correlate.getCorrelationId(), + correlate.getRequiredColumns(), correlate.getJoinType()); + call.transformTo(correlateRel); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnnestRel.java similarity index 50% copy from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java copy to exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnnestRel.java index 6ed593d..ab827e5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnnestRel.java @@ -15,20 +15,30 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.drill.exec.physical.impl.join; +package org.apache.drill.exec.planner.logical; -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.ExecutorFragmentContext; -import org.apache.drill.exec.physical.config.LateralJoinPOP; -import org.apache.drill.exec.physical.impl.BatchCreator; -import org.apache.drill.exec.record.RecordBatch; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; -import java.util.List; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; +import org.apache.drill.common.logical.data.LogicalOperator; +import org.apache.drill.exec.planner.common.DrillUnnestRelBase; + + +public class DrillUnnestRel extends DrillUnnestRelBase implements DrillRel { + + + public DrillUnnestRel(RelOptCluster cluster, RelTraitSet traits, + RelDataType rowType, RexNode ref) { + super(cluster, traits, ref); + this.rowType = rowType; + } -public class LateralJoinBatchCreator implements BatchCreator<LateralJoinPOP> { @Override - public LateralJoinBatch getBatch(ExecutorFragmentContext context, LateralJoinPOP config, List<RecordBatch> children) - throws ExecutionSetupException { - return new LateralJoinBatch(config, context, children.get(0), children.get(1)); + public LogicalOperator implement(DrillImplementor implementor) { + //TODO: implementation for direct convert from RelNode to logical operator for explainPlan + return null; } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnnestRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnnestRule.java new file mode 100644 index 0000000..762eb46 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnnestRule.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.core.Uncollect; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.logical.LogicalValues; + +public class DrillUnnestRule extends RelOptRule { + public static final RelOptRule INSTANCE = new DrillUnnestRule(); + + private DrillUnnestRule() { + super(RelOptHelper.some(Uncollect.class, + RelOptHelper.some(LogicalProject.class, RelOptHelper.any(LogicalValues.class, Convention.NONE))), + DrillRelFactories.LOGICAL_BUILDER, "DrillUnnestRule"); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final Uncollect uncollect = call.rel(0); + final LogicalProject project = call.rel(1); + final LogicalValues values = call.rel(2); + + final RelTraitSet traits = uncollect.getTraitSet().plus(DrillRel.DRILL_LOGICAL); + DrillUnnestRel unnest = new DrillUnnestRel(uncollect.getCluster(), traits, uncollect.getRowType(), + project.getProjects().iterator().next()); + call.transformTo(unnest); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrel.java new file mode 100644 index 0000000..9d308f0 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrel.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Correlate; +import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.sql.SemiJoinType; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.LateralJoinPOP; +import org.apache.drill.exec.planner.common.DrillCorrelateRelBase; +import org.apache.drill.exec.planner.physical.visitor.PrelVisitor; +import org.apache.drill.exec.record.BatchSchema; + +import java.io.IOException; +import java.util.Iterator; + +public class CorrelatePrel extends DrillCorrelateRelBase implements Prel { + + + protected CorrelatePrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, + CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) { + super(cluster, traits, left, right, correlationId, requiredColumns, semiJoinType); + } + @Override + public Correlate copy(RelTraitSet traitSet, + RelNode left, RelNode right, CorrelationId correlationId, + ImmutableBitSet requiredColumns, SemiJoinType joinType) { + return new CorrelatePrel(this.getCluster(), this.getTraitSet(), left, right, correlationId, requiredColumns, + this.getJoinType()); + } + + @Override + public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { + + PhysicalOperator leftPop = ((Prel)left).getPhysicalOperator(creator); + PhysicalOperator rightPop = ((Prel)right).getPhysicalOperator(creator); + + SemiJoinType jtype = this.getJoinType(); + + LateralJoinPOP ljoin = new LateralJoinPOP(leftPop, rightPop, jtype.toJoinType()); + return creator.addMetadata(this, ljoin); + } + + + @Override + public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> visitor, X value) throws E { + return visitor.visitPrel(this, value); + } + + @Override + public Iterator<Prel> iterator() { + return PrelUtil.iter(getLeft(), getRight()); + } + + @Override + public boolean needsFinalColumnReordering() { + return true; + } + + @Override + public BatchSchema.SelectionVectorMode[] getSupportedEncodings() { + return BatchSchema.SelectionVectorMode.DEFAULT; + } + + @Override + public BatchSchema.SelectionVectorMode getEncoding() { + return BatchSchema.SelectionVectorMode.NONE; + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrule.java new file mode 100644 index 0000000..4f1e1d8 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/CorrelatePrule.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.drill.exec.planner.logical.DrillCorrelateRel; +import org.apache.drill.exec.planner.logical.RelOptHelper; + +public class CorrelatePrule extends Prule { + public static final RelOptRule INSTANCE = new CorrelatePrule("Prel.CorrelatePrule", + RelOptHelper.any(DrillCorrelateRel.class)); + + private CorrelatePrule(String name, RelOptRuleOperand operand) { + super(operand, name); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final DrillCorrelateRel correlate = call.rel(0); + final RelNode left = correlate.getLeft(); + final RelNode right = correlate.getRight(); + RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL); + RelTraitSet traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL); + + RelTraitSet corrTraits = traitsLeft.plus(DrillDistributionTrait.RANDOM_DISTRIBUTED); + + final RelNode convertedLeft = convert(left, traitsLeft); + final RelNode convertedRight = convert(right, traitsRight); + + final CorrelatePrel correlatePrel = new CorrelatePrel(correlate.getCluster(), + corrTraits, + convertedLeft, convertedRight, correlate.getCorrelationId(), + correlate.getRequiredColumns(),correlate.getJoinType()); + call.transformTo(correlatePrel); + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java new file mode 100644 index 0000000..1e87305 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexFieldAccess; +import org.apache.calcite.rex.RexNode; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.UnnestPOP; +import org.apache.drill.exec.planner.common.DrillUnnestRelBase; +import org.apache.drill.exec.planner.physical.visitor.PrelVisitor; +import org.apache.drill.exec.record.BatchSchema; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; + +public class UnnestPrel extends DrillUnnestRelBase implements Prel { + + protected final UnnestPOP unnestPOP; + + public UnnestPrel(RelOptCluster cluster, RelTraitSet traits, + RelDataType rowType, RexNode ref) { + super(cluster, traits, ref); + this.unnestPOP = new UnnestPOP(null, SchemaPath.getSimplePath(((RexFieldAccess)ref).getField().getName())); + this.rowType = rowType; + } + + @Override + public Iterator<Prel> iterator() { + return Collections.emptyIterator(); + } + + @Override + public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> visitor, X value) throws E { + return visitor.visitPrel(this, value); + } + + @Override + public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) + throws IOException { + return creator.addMetadata(this, unnestPOP); + } + + @Override + public BatchSchema.SelectionVectorMode[] getSupportedEncodings() { + return BatchSchema.SelectionVectorMode.DEFAULT; + } + + @Override + public BatchSchema.SelectionVectorMode getEncoding() { + return BatchSchema.SelectionVectorMode.NONE; + } + + @Override + public boolean needsFinalColumnReordering() { + return true; + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrule.java new file mode 100644 index 0000000..765304d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrule.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rex.RexFieldAccess; +import org.apache.calcite.rex.RexNode; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.config.UnnestPOP; +import org.apache.drill.exec.planner.logical.DrillUnnestRel; +import org.apache.drill.exec.planner.logical.RelOptHelper; + +public class UnnestPrule extends Prule { + public static final RelOptRule INSTANCE = new UnnestPrule(); + + private UnnestPrule() { + super(RelOptHelper.any(DrillUnnestRel.class), "UnnestPrule"); + } + @Override + public void onMatch(RelOptRuleCall call) { + final DrillUnnestRel unnest = call.rel(0); + RexNode ref = unnest.getRef(); + if (ref instanceof RexFieldAccess) { + final RexFieldAccess field = (RexFieldAccess)ref; + field.getField().getName(); + } + + UnnestPrel unnestPrel = new UnnestPrel(unnest.getCluster(), + unnest.getTraitSet().plus(Prel.DRILL_PHYSICAL), unnest.getRowType(), ref); + + call.transformTo(unnestPrel); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPhysicalPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPhysicalPlan.java new file mode 100644 index 0000000..09d85d4 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPhysicalPlan.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.lateraljoin; + +import static org.junit.Assert.assertEquals; +import org.apache.drill.test.BaseTestQuery; +import org.junit.Ignore; +import org.junit.Test; + +public class TestLateralPhysicalPlan extends BaseTestQuery { + + @Test + public void testLateralPlan1() throws Exception { + int numOutputRecords = testPhysical(getFile("lateraljoin/lateralplan1.json")); + assertEquals(numOutputRecords, 12); + } + + @Test + @Ignore("To be fixed") + public void testLateralSqlStar() throws Exception { + String Sql = "select * from cp.`lateraljoin/nested-customer.json` t, unnest(t.orders) t2 limit 1"; + testBuilder() + .unOrdered() + .sqlQuery(Sql) + .baselineColumns("c_name", "o_shop") + .baselineValues("customer1", "Meno Park 1st") + .go(); + + } + + @Test + public void testLateralSql() throws Exception { + String Sql = "select t.c_name, t2.o_shop as o_shop from cp.`lateraljoin/nested-customer.json` t, unnest(t.orders) t2 limit 1"; + testBuilder() + .unOrdered() + .sqlQuery(Sql) + .baselineColumns("c_name", "o_shop") + .baselineValues("customer1", "Meno Park 1st") + .go(); + + } + + @Test + @Ignore("naming of single column") + public void testLateralSqlPlainCol() throws Exception { + String Sql = "select t.c_name, t2.c_phone from cp.`lateraljoin/nested-customer.json` t, unnest(t.c_phone) t2 limit 1"; + testBuilder() + .unOrdered() + .sqlQuery(Sql) + .baselineColumns("c_name", "c_phone_flat") + .baselineValues("customer1", "6505200001") + .go(); + + } + + @Test + @Ignore("To be fixed") + public void testLateralSqlWithAS() throws Exception { + String Sql = "select t.c_name, t2.o_shop from cp.`lateraljoin/nested-customer.parquet` t, unnest(t.orders) as t2(o_shop) limit 1"; + testBuilder() + .unOrdered() + .sqlQuery(Sql) + .baselineColumns("c_name", "o_shop") + .baselineValues("customer1", "Meno Park 1st") + .go(); + + } + @Test + public void testSubQuerySql() throws Exception { + String Sql = "select t2.os.* from (select t.orders as os from cp.`lateraljoin/nested-customer.parquet` t) t2"; + test(Sql); + } +} diff --git a/exec/java-exec/src/test/resources/lateraljoin/lateralplan1.json b/exec/java-exec/src/test/resources/lateraljoin/lateralplan1.json new file mode 100644 index 0000000..9be5878 --- /dev/null +++ b/exec/java-exec/src/test/resources/lateraljoin/lateralplan1.json @@ -0,0 +1,95 @@ +{ + "head" : { + "version" : 1, + "generator" : { + "type" : "DefaultSqlHandler", + "info" : "" + }, + "type" : "APACHE_DRILL_PHYSICAL", + "options" : [ { + "kind" : "DOUBLE", + "accessibleScopes" : "ALL", + "name" : "planner.index.noncovering_selectivity_threshold", + "float_val" : 0.25, + "scope" : "SESSION" + } ], + "queue" : 0, + "hasResourcePlan" : false, + "resultMode" : "EXEC" + }, + "graph" : [ { + "pop" : "parquet-scan", + "@id" : 5, + "userName" : "root", + "entries" : [ { + "path" : "lateraljoin/nested-customer.parquet" + } ], + "storage" : { + "type" : "file", + "enabled" : true, + "connection" : "classpath:///", + "formats" : { + "json" : { + "type" : "json" + }, + "parquet" : { + "type" : "parquet" + } + } + }, + "format" : { + "type" : "parquet" + }, + "cost" : 1000.0 + }, { + "pop" : "unnest", + "@id" : 7, + "userName" : "root", + "column" : "`orders`", + "cost" : 1000.0 + }, { + "pop" : "project", + "@id" : 6, + "exprs" : [ { + "ref" : "`ITEM`", + "expr" : "`o_shop`" + }], + "child" : 7, + "outputProj" : false, + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000, + "cost" : 1000.0 + }, { + "pop" : "lateral-join", + "@id" : 4, + "left" : 5, + "right" : 6, + "unnestForLateralJoin": 7, + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000, + "cost" : 1000.0 + }, { + "pop" : "project", + "@id" : 3, + "exprs" : [ { + "ref" : "`ITEM`", + "expr" : "`c_name`" + }, { + "ref" : "`ITEM1`", + "expr" : "`o_shop`" + } ], + "child" : 4, + "outputProj" : false, + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000, + "cost" : 1000.0 + }, { + "pop" : "screen", + "@id" : 0, + "child" : 3, + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000, + "cost" : 1000.0 + } ] +} + diff --git a/exec/java-exec/src/test/resources/lateraljoin/nested-customer.json b/exec/java-exec/src/test/resources/lateraljoin/nested-customer.json new file mode 100644 index 0000000..710ca42 --- /dev/null +++ b/exec/java-exec/src/test/resources/lateraljoin/nested-customer.json @@ -0,0 +1,114 @@ +{ + "c_name" : "customer1", + "c_id" : 1, + "c_phone" : ["6505200001", "4085201234", "6125205678"], + "orders" : [{"o_id": 1, "o_shop": "Meno Park 1st", "o_amount": 4.5, + "items" : [ {"i_name" : "paper towel", "i_number": 2, "i_supplier": "oregan"}, + {"i_name" : "map", "i_number": 1, "i_supplier": "washington"}, + {"i_name" : "cheese", "i_number": 9, "i_supplier": "california"} + ] + + }, + {"o_id": 2, "o_shop": "Mountain View 1st", "o_amount": 104.5, + "items" : [ {"i_name" : "beef", "i_number": 3, "i_supplier": "montana"}, + {"i_name" : "tooth paste", "i_number": 4, "i_supplier": "washington"}, + {"i_name" : "hat", "i_number": 7, "i_supplier": "california"} + ] + + }, + {"o_id": 3, "o_shop": "Sunnyvale 1st", "o_amount": 294.5, + "items" : [ {"i_name" : "paper towel", "i_number": 5, "i_supplier": "oregan"}, + {"i_name" : "tooth paste", "i_number": 6, "i_supplier": "washington"}, + {"i_name" : "cheese", "i_number": 8, "i_supplier": "california"} + ] + } + ], + "c_address" : "bay area, CA" +} +{ + "c_name" : "customer2", + "c_id" : 2, + "c_phone" : ["1505200001", "7085201234", "2125205678"], + "orders" : [{"o_id": 10, "o_shop": "Mountain View 1st", "o_amount": 724.5, + "items" : [ {"i_name" : "beef", "i_number": 12, "i_supplier": "montana"}, + {"i_name" : "tooth paste", "i_number": 11, "i_supplier": "washington"}, + {"i_name" : "hat", "i_number": 10, "i_supplier": "california"} + ] + + }, + + {"o_id": 11, "o_shop": "Sunnyvale 1st", "o_amount": 179.5, + "items" : [ {"i_name" : "paper towel", "i_number": 13, "i_supplier": "oregan"}, + {"i_name" : "tooth paste", "i_number": 14, "i_supplier": "washington"}, + {"i_name" : "cheese", "i_number": 15, "i_supplier": "california"} + ] + }, + {"o_id": 12, "o_shop": "Meno Park 1st", "o_amount": 80.0, + "items" : [ {"i_name" : "paper towel", "i_number": 13, "i_supplier": "oregan"}, + {"i_name" : "tooth paste", "i_number": 14, "i_supplier": "washington"}, + {"i_name" : "cheese", "i_number": 15, "i_supplier": "california"} + ] + } + ], + "c_address" : "LA, CA" +} +{ + "c_name" : "customer3", + "c_id" : 3, + "c_phone" : ["1205200001", "7285201234", "2325205678"], + "orders" : [{"o_id": 21, "o_shop": "Meno Park 1st", "o_amount": 192.5, + "items" : [ {"i_name" : "beef", "i_number": 22, "i_supplier": "montana"}, + {"i_name" : "tooth paste", "i_number": 21, "i_supplier": "washington"}, + {"i_name" : "hat", "i_number": 20, "i_supplier": "california"} + ] + + }, + + {"o_id": 22, "o_shop": "Mountain View 1st", "o_amount": 680.9, + "items" : [ {"i_name" : "paper towel", "i_number": 23, "i_supplier": "oregan"}, + {"i_name" : "tooth paste", "i_number": 24, "i_supplier": "washington"}, + {"i_name" : "cheese", "i_number": 25, "i_supplier": "california"} + ] + }, + + {"o_id": 23, "o_shop": "Sunnyvale 1st", "o_amount": 772.2, + "items" : [ {"i_name" : "paper towel", "i_number": 26, "i_supplier": "oregan"}, + {"i_name" : "tooth paste", "i_number": 27, "i_supplier": "washington"}, + {"i_name" : "cheese", "i_number": 28, "i_supplier": "california"} + ] + } + + ], + "c_address" : "bay area, CA" +} +{ + "c_name" : "customer4", + "c_id" : 4, + "c_phone" : ["6509200001", "4088201234", "6127205678"], + "orders" : [{"o_id": 30, "o_shop": "Mountain View 1st", "o_amount": 870.2, + "items" : [ {"i_name" : "beef", "i_number": 32, "i_supplier": "montana"}, + {"i_name" : "tooth paste", "i_number": 31, "i_supplier": "washington"}, + {"i_name" : "hat", "i_number": 30, "i_supplier": "california"} + ] + + }, + + {"o_id": 31, "o_shop": "Sunnyvale 1st", "o_amount": 970.5, + "items" : [ {"i_name" : "beef", "i_number": 32, "i_supplier": "montana"}, + {"i_name" : "tooth paste", "i_number": 31, "i_supplier": "washington"}, + {"i_name" : "cheese", "i_number": 30, "i_supplier": "california"} + ] + + }, + + {"o_id": 32, "o_shop": "Meno Park 1st", "o_amount": 1030.1, + "items" : [ {"i_name" : "paper towel", "i_number": 36, "i_supplier": "oregan"}, + {"i_name" : "tooth paste", "i_number": 37, "i_supplier": "washington"}, + {"i_name" : "cheese", "i_number": 38, "i_supplier": "california"} + ] + } + + ], + "c_address" : "LA, CA" +} + diff --git a/exec/java-exec/src/test/resources/lateraljoin/nested-customer.parquet b/exec/java-exec/src/test/resources/lateraljoin/nested-customer.parquet new file mode 100644 index 0000000..97d898a Binary files /dev/null and b/exec/java-exec/src/test/resources/lateraljoin/nested-customer.parquet differ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java b/logical/src/main/java/org/apache/drill/common/logical/data/LateralJoin.java similarity index 50% copy from exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java copy to logical/src/main/java/org/apache/drill/common/logical/data/LateralJoin.java index 946b4a6..b2c78a4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java +++ b/logical/src/main/java/org/apache/drill/common/logical/data/LateralJoin.java @@ -15,41 +15,46 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.drill.exec.physical.config; +package org.apache.drill.common.logical.data; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; -import com.google.common.base.Preconditions; -import org.apache.calcite.rel.core.JoinRelType; -import org.apache.drill.exec.physical.base.AbstractJoinPop; -import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; +import com.google.common.collect.Iterators; +import org.apache.drill.common.logical.data.visitors.LogicalVisitor; +import java.util.Iterator; import java.util.List; @JsonTypeName("lateral-join") -public class LateralJoinPOP extends AbstractJoinPop { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinPOP.class); +public class LateralJoin extends LogicalOperatorBase { + private final LogicalOperator left; + private final LogicalOperator right; @JsonCreator - public LateralJoinPOP( - @JsonProperty("left") PhysicalOperator left, - @JsonProperty("right") PhysicalOperator right, - @JsonProperty("joinType") JoinRelType joinType) { - super(left, right, joinType, null, null); + public LateralJoin(@JsonProperty("left") LogicalOperator left, @JsonProperty("right") LogicalOperator right) { + super(); + this.left = left; + this.right = right; + left.registerAsSubscriber(this); + right.registerAsSubscriber(this); + } + + public LogicalOperator getLeft() { + return left; + } + + public LogicalOperator getRight() { + return right; } @Override - public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { - Preconditions.checkArgument(children.size() == 2, - "Lateral join should have two physical operators"); - return new LateralJoinPOP(children.get(0), children.get(1), joinType); + public Iterator<LogicalOperator> iterator() { + return Iterators.forArray(getLeft(), getRight()); } @Override - public int getOperatorType() { - return CoreOperatorType.LATERAL_JOIN_VALUE; + public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> logicalVisitor, X value) throws E { + return logicalVisitor.visitLateralJoin(this, value); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java b/logical/src/main/java/org/apache/drill/common/logical/data/Unnest.java similarity index 51% copy from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java copy to logical/src/main/java/org/apache/drill/common/logical/data/Unnest.java index 6ed593d..a70e8cc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatchCreator.java +++ b/logical/src/main/java/org/apache/drill/common/logical/data/Unnest.java @@ -15,20 +15,31 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.drill.exec.physical.impl.join; +package org.apache.drill.common.logical.data; -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.ExecutorFragmentContext; -import org.apache.drill.exec.physical.config.LateralJoinPOP; -import org.apache.drill.exec.physical.impl.BatchCreator; -import org.apache.drill.exec.record.RecordBatch; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.data.visitors.LogicalVisitor; -import java.util.List; +@JsonTypeName("unnest") +public class Unnest extends SingleInputOperator { + + private final SchemaPath column; + + @JsonCreator + public Unnest(@JsonProperty("column") SchemaPath column) { + this.column = column; + } + + public SchemaPath getColumn() { + return column; + } -public class LateralJoinBatchCreator implements BatchCreator<LateralJoinPOP> { @Override - public LateralJoinBatch getBatch(ExecutorFragmentContext context, LateralJoinPOP config, List<RecordBatch> children) - throws ExecutionSetupException { - return new LateralJoinBatch(config, context, children.get(0), children.get(1)); + public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> logicalVisitor, X value) throws E { + return logicalVisitor.visitUnnest(this, value); } + } diff --git a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java index a04c3a4..4fd64c5 100644 --- a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java +++ b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java @@ -17,6 +17,8 @@ */ package org.apache.drill.common.logical.data.visitors; +import org.apache.drill.common.logical.data.LateralJoin; +import org.apache.drill.common.logical.data.Unnest; import org.apache.drill.common.logical.data.Values; import org.apache.drill.common.logical.data.Filter; import org.apache.drill.common.logical.data.Flatten; @@ -123,4 +125,14 @@ public abstract class AbstractLogicalVisitor<T, X, E extends Throwable> implemen public T visitWriter(Writer writer, X value) throws E { return visitOp(writer, value); } + + @Override + public T visitUnnest(Unnest unnest, X value) throws E { + return visitOp(unnest, value); + } + + @Override + public T visitLateralJoin(LateralJoin lateralJoin, X value) throws E { + return visitOp(lateralJoin, value); + } } diff --git a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java index 5b5ca5f..55fa838 100644 --- a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java +++ b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java @@ -18,6 +18,8 @@ package org.apache.drill.common.logical.data.visitors; +import org.apache.drill.common.logical.data.LateralJoin; +import org.apache.drill.common.logical.data.Unnest; import org.apache.drill.common.logical.data.Values; import org.apache.drill.common.logical.data.Filter; import org.apache.drill.common.logical.data.Flatten; @@ -63,4 +65,7 @@ public interface LogicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitUnion(Union union, EXTRA value) throws EXCEP; public RETURN visitWindow(Window window, EXTRA value) throws EXCEP; public RETURN visitWriter(Writer writer, EXTRA value) throws EXCEP; + + public RETURN visitUnnest(Unnest unnest, EXTRA value) throws EXCEP; + public RETURN visitLateralJoin(LateralJoin lateralJoin, EXTRA value) throws EXCEP; } -- To stop receiving notification emails like this one, please contact [email protected].
