This is an automated email from the ASF dual-hosted git repository. boaz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 5a0c75f69166283a17179e80b97712bed57110d9 Author: HanumathRao <[email protected]> AuthorDate: Fri Jun 29 08:46:41 2018 -0700 DRILL-6475: Unnest: Null fieldId Pointer. closes #1381 --- .../visitor/AdjustOperatorsSchemaVisitor.java | 148 +++++++++++++++++++++ .../physical/visitor/JoinPrelRenameVisitor.java | 87 ------------ .../planner/sql/handlers/DefaultSqlHandler.java | 5 +- .../impl/lateraljoin/TestLateralPlans.java | 24 ++++ 4 files changed, 175 insertions(+), 89 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/AdjustOperatorsSchemaVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/AdjustOperatorsSchemaVisitor.java new file mode 100644 index 0000000..c46b725 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/AdjustOperatorsSchemaVisitor.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical.visitor; + +import java.util.ArrayList; +import java.util.List; +import com.google.common.base.Preconditions; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.drill.exec.planner.physical.JoinPrel; +import org.apache.drill.exec.planner.physical.LateralJoinPrel; +import org.apache.drill.exec.planner.physical.Prel; +import org.apache.calcite.rel.RelNode; + +import com.google.common.collect.Lists; +import org.apache.drill.exec.planner.physical.UnnestPrel; + +/** + * AdjustOperatorsSchemaVisitor visits corresponding operators' which depending upon their functionality + * adjusts their output row types. The adjusting mechanism is unique to each operator. In case of joins this visitor + * adjusts the field names to make sure that upstream operator only sees that there are unique field names even though + * the children of the join has same field names. Whereas in case of lateral/unnest operators it changes the correlated + * field and also the unnest operator's output row type. + */ +public class AdjustOperatorsSchemaVisitor extends BasePrelVisitor<Prel, Void, RuntimeException>{ + + private Prel registeredPrel = null; + + private static AdjustOperatorsSchemaVisitor INSTANCE = new AdjustOperatorsSchemaVisitor(); + + public static Prel adjustSchema(Prel prel){ + return prel.accept(INSTANCE, null); + } + + private void register(Prel prel) { + this.registeredPrel = prel; + } + + private Prel getRegisteredPrel() { + return this.registeredPrel; + } + + @Override + public Prel visitPrel(Prel prel, Void value) throws RuntimeException { + return preparePrel(prel, getChildren(prel)); + } + + public void unRegister() { + this.registeredPrel = null; + } + + private List<RelNode> getChildren(Prel prel, int registerForChild) { + int ch = 0; + List<RelNode> children = Lists.newArrayList(); + for(Prel child : prel){ + if (ch == registerForChild) { + register(prel); + } + child = child.accept(this, null); + if (ch == registerForChild) { + unRegister(); + } + children.add(child); + ch++; + } + return children; + } + + private List<RelNode> getChildren(Prel prel) { + return getChildren(prel, -1); + } + + private Prel preparePrel(Prel prel, List<RelNode> renamedNodes) { + return (Prel) prel.copy(prel.getTraitSet(), renamedNodes); + } + + @Override + public Prel visitJoin(JoinPrel prel, Void value) throws RuntimeException { + + List<RelNode> children = getChildren(prel); + + final int leftCount = children.get(0).getRowType().getFieldCount(); + + List<RelNode> reNamedChildren = Lists.newArrayList(); + + RelNode left = prel.getJoinInput(0, children.get(0)); + RelNode right = prel.getJoinInput(leftCount, children.get(1)); + + reNamedChildren.add(left); + reNamedChildren.add(right); + + return preparePrel(prel, reNamedChildren); + } + + @Override + public Prel visitLateral(LateralJoinPrel prel, Void value) throws RuntimeException { + + List<RelNode> children = getChildren(prel, 1); + List<RelNode> reNamedChildren = new ArrayList<>(); + + for (int i = 0; i < children.size(); i++) { + reNamedChildren.add(prel.getLateralInput(i, children.get(i))); + } + + return preparePrel(prel, reNamedChildren); + } + + @Override + public Prel visitUnnest(UnnestPrel prel, Void value) throws RuntimeException { + Preconditions.checkArgument(registeredPrel != null && registeredPrel instanceof LateralJoinPrel); + Preconditions.checkArgument(prel.getRowType().getFieldCount() == 1); + RexBuilder builder = prel.getCluster().getRexBuilder(); + + LateralJoinPrel lateralJoinPrel = (LateralJoinPrel) getRegisteredPrel(); + int correlationIndex = lateralJoinPrel.getRequiredColumns().nextSetBit(0); + String correlationColumnName = lateralJoinPrel.getLeft().getRowType().getFieldNames().get(correlationIndex); + RexNode corrRef = builder.makeCorrel(lateralJoinPrel.getLeft().getRowType(), lateralJoinPrel.getCorrelationId()); + RexNode fieldAccess = builder.makeFieldAccess(corrRef, correlationColumnName, false); + + List<String> fieldNames = new ArrayList<>(); + List<RelDataType> fieldTypes = new ArrayList<>(); + for (RelDataTypeField field : prel.getRowType().getFieldList()) { + fieldNames.add(correlationColumnName); + fieldTypes.add(field.getType()); + } + + UnnestPrel unnestPrel = new UnnestPrel(prel.getCluster(), prel.getTraitSet(), + prel.getCluster().getTypeFactory().createStructType(fieldTypes, fieldNames), fieldAccess); + return unnestPrel; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java deleted file mode 100644 index 3a2529b..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.planner.physical.visitor; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.drill.exec.planner.physical.JoinPrel; -import org.apache.drill.exec.planner.physical.LateralJoinPrel; -import org.apache.drill.exec.planner.physical.Prel; -import org.apache.calcite.rel.RelNode; - -import com.google.common.collect.Lists; - -public class JoinPrelRenameVisitor extends BasePrelVisitor<Prel, Void, RuntimeException>{ - - private static JoinPrelRenameVisitor INSTANCE = new JoinPrelRenameVisitor(); - - public static Prel insertRenameProject(Prel prel){ - return prel.accept(INSTANCE, null); - } - - @Override - public Prel visitPrel(Prel prel, Void value) throws RuntimeException { - return preparePrel(prel, getChildren(prel)); - } - - private List<RelNode> getChildren(Prel prel) { - List<RelNode> children = Lists.newArrayList(); - for(Prel child : prel){ - child = child.accept(this, null); - children.add(child); - } - return children; - } - - private Prel preparePrel(Prel prel, List<RelNode> renamedNodes) { - return (Prel) prel.copy(prel.getTraitSet(), renamedNodes); - } - - @Override - public Prel visitJoin(JoinPrel prel, Void value) throws RuntimeException { - - List<RelNode> children = getChildren(prel); - - final int leftCount = children.get(0).getRowType().getFieldCount(); - - List<RelNode> reNamedChildren = Lists.newArrayList(); - - RelNode left = prel.getJoinInput(0, children.get(0)); - RelNode right = prel.getJoinInput(leftCount, children.get(1)); - - reNamedChildren.add(left); - reNamedChildren.add(right); - - return preparePrel(prel, reNamedChildren); - } - - //TODO: consolidate this code with join column renaming. - @Override - public Prel visitLateral(LateralJoinPrel prel, Void value) throws RuntimeException { - - List<RelNode> children = getChildren(prel); - List<RelNode> reNamedChildren = new ArrayList<>(); - - for (int i = 0; i < children.size(); i++) { - reNamedChildren.add(prel.getLateralInput(i, children.get(i))); - } - - return preparePrel(prel, reNamedChildren); - } -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index 1e671ff..83e1a8f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -88,11 +88,11 @@ import org.apache.drill.exec.planner.physical.PhysicalPlanCreator; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.physical.Prel; import org.apache.drill.exec.planner.physical.explain.PrelSequencer; +import org.apache.drill.exec.planner.physical.visitor.AdjustOperatorsSchemaVisitor; import org.apache.drill.exec.planner.physical.visitor.ComplexToJsonPrelVisitor; import org.apache.drill.exec.planner.physical.visitor.ExcessiveExchangeIdentifier; import org.apache.drill.exec.planner.physical.visitor.FinalColumnReorderer; import org.apache.drill.exec.planner.physical.visitor.InsertLocalExchangeVisitor; -import org.apache.drill.exec.planner.physical.visitor.JoinPrelRenameVisitor; import org.apache.drill.exec.planner.physical.visitor.MemoryEstimationVisitor; import org.apache.drill.exec.planner.physical.visitor.RelUniqifier; import org.apache.drill.exec.planner.physical.visitor.RewriteProjectToFlatten; @@ -512,8 +512,9 @@ public class DefaultSqlHandler extends AbstractSqlHandler { * 2.) * Join might cause naming conflicts from its left and right child. * In such case, we have to insert Project to rename the conflicting names. + * Unnest operator might need to adjust the correlated field after the physical planning. */ - phyRelNode = JoinPrelRenameVisitor.insertRenameProject(phyRelNode); + phyRelNode = AdjustOperatorsSchemaVisitor.adjustSchema(phyRelNode); /* * 2.1) Swap left / right for INNER hash join, if left's row count is < (1 + margin) right's row count. diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java index 77d245f..222b036 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java @@ -358,6 +358,7 @@ public class TestLateralPlans extends BaseTestQuery { public void testNoExchangeWithStreamAggWithGrpBy() throws Exception { String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` t," + " lateral ( select sum(t2.ord.o_totalprice) as totalprice from unnest(t.c_orders) t2(ord) group by t2.ord.o_orderkey) d1"; + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true) .setOptionDefault(ExecConstants.SLICE_TARGET, 1) @@ -532,4 +533,27 @@ public class TestLateralPlans extends BaseTestQuery { plan, not(containsString("Sort"))); } } + + @Test + public void testMultiUnnestQuery() throws Exception { + String Sql = "SELECT t5.l_quantity FROM dfs.`lateraljoin/multipleFiles` t, " + + "LATERAL (SELECT t2.ordrs.o_lineitems FROM UNNEST(t.c_orders) t2(ordrs)) t3(lineitems), " + + "LATERAL (SELECT t4.lineitems.l_quantity FROM UNNEST(t3.lineitems) t4(lineitems)) t5(l_quantity) order by 1"; + + String baselineQuery = "select dt.lineitems.l_quantity as l_quantity from (select flatten(dt.orders.o_lineitems) as lineitems " + + "from (select flatten(c_orders) as orders from dfs.`lateraljoin/multipleFiles` t) dt)dt order by 1"; + + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) + .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true) + .setOptionDefault(ExecConstants.SLICE_TARGET, 1); + + try (ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + client.testBuilder() + .ordered() + .sqlBaselineQuery(baselineQuery) + .sqlQuery(Sql) + .go(); + } + } }
