[
https://issues.apache.org/jira/browse/DRILL-6456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16498874#comment-16498874
]
ASF GitHub Bot commented on DRILL-6456:
---------------------------------------
parthchandra closed pull request #1299: DRILL-6456: Planner shouldn't create
any exchanges on the right side …
URL: https://github.com/apache/drill/pull/1299
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
index 7bfe21483a..b4ed5e0e63 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
@@ -19,8 +19,8 @@
import java.util.Collections;
import java.util.List;
-
import org.apache.drill.exec.planner.fragment.DistributionAffinity;
+import org.apache.drill.exec.planner.physical.CorrelatePrel;
import org.apache.drill.exec.planner.physical.ExchangePrel;
import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.ScanPrel;
@@ -28,9 +28,11 @@
import org.apache.calcite.rel.RelNode;
import com.google.common.collect.Lists;
+import org.apache.drill.exec.planner.physical.UnnestPrel;
public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel,
ExcessiveExchangeIdentifier.MajorFragmentStat, RuntimeException> {
private final long targetSliceSize;
+ private CorrelatePrel topMostLateralJoin = null;
public ExcessiveExchangeIdentifier(long targetSliceSize) {
this.targetSliceSize = targetSliceSize;
@@ -45,18 +47,28 @@ public static Prel removeExcessiveEchanges(Prel prel, long
targetSliceSize) {
public Prel visitExchange(ExchangePrel prel, MajorFragmentStat parent)
throws RuntimeException {
parent.add(prel);
MajorFragmentStat newFrag = new MajorFragmentStat();
+ newFrag.setRightSideOfLateral(parent.isRightSideOfLateral());
Prel newChild = ((Prel) prel.getInput()).accept(this, newFrag);
-
- if (newFrag.isSingular() && parent.isSingular() &&
- // if one of them has strict distribution or none, we can remove the
exchange
- (!newFrag.isDistributionStrict() || !parent.isDistributionStrict())
- ) {
+ if (canRemoveExchange(parent, newFrag)) {
return newChild;
} else {
return (Prel) prel.copy(prel.getTraitSet(),
Collections.singletonList((RelNode) newChild));
}
}
+ private boolean canRemoveExchange(MajorFragmentStat parentFrag,
MajorFragmentStat childFrag) {
+ if (childFrag.isSingular() && parentFrag.isSingular() &&
+ (!childFrag.isDistributionStrict() ||
!parentFrag.isDistributionStrict())) {
+ return true;
+ }
+
+ if (parentFrag.isRightSideOfLateral()) {
+ return true;
+ }
+
+ return false;
+ }
+
@Override
public Prel visitScreen(ScreenPrel prel, MajorFragmentStat s) throws
RuntimeException {
s.addScreen(prel);
@@ -70,6 +82,40 @@ public Prel visitScan(ScanPrel prel, MajorFragmentStat s)
throws RuntimeExceptio
return prel;
}
+ @Override
+ public Prel visitCorrelate(CorrelatePrel prel, MajorFragmentStat s) throws
RuntimeException {
+ List<RelNode> children = Lists.newArrayList();
+ s.add(prel);
+
+ for (Prel p : prel) {
+ s.add(p);
+ }
+
+ // Traverse the left side of the Lateral join first. Left side of the
+ // Lateral shouldn't have any restrictions on Exchanges.
+ children.add(((Prel)prel.getInput(0)).accept(this, s));
+ // Save the outermost Lateral join so as to unset the flag later.
+ if (topMostLateralJoin == null) {
+ topMostLateralJoin = prel;
+ }
+
+ // Right side of the Lateral shouldn't have any Exchanges. Hence set the
+ // flag so that visitExchange removes the exchanges.
+ s.setRightSideOfLateral(true);
+ children.add(((Prel)prel.getInput(1)).accept(this, s));
+ if (topMostLateralJoin == prel) {
+ topMostLateralJoin = null;
+ s.setRightSideOfLateral(false);
+ }
+ return (Prel) prel.copy(prel.getTraitSet(), children);
+ }
+
+ @Override
+ public Prel visitUnnest(UnnestPrel prel, MajorFragmentStat s) throws
RuntimeException {
+ s.addUnnest(prel);
+ return prel;
+ }
+
@Override
public Prel visitPrel(Prel prel, MajorFragmentStat s) throws
RuntimeException {
List<RelNode> children = Lists.newArrayList();
@@ -98,6 +144,7 @@ public MajorFragmentStat getNewStat() {
private double maxRows = 0d;
private int maxWidth = Integer.MAX_VALUE;
private boolean isMultiSubScan = false;
+ private boolean rightSideOfLateral = false;
public void add(Prel prel) {
maxRows =
Math.max(prel.estimateRowCount(prel.getCluster().getMetadataQuery()), maxRows);
@@ -130,9 +177,20 @@ public boolean isSingular() {
return w == 1;
}
+ public boolean isRightSideOfLateral() {
+ return this.rightSideOfLateral;
+ }
+
+ public void addUnnest(UnnestPrel prel) {
+ add(prel);
+ }
+
+ public void setRightSideOfLateral(boolean rightSideOfLateral) {
+ this.rightSideOfLateral = rightSideOfLateral;
+ }
+
public boolean isDistributionStrict() {
return distributionAffinity == DistributionAffinity.HARD;
}
}
-
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
index 9e19729f6d..00ab971e15 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
@@ -18,10 +18,13 @@
package org.apache.drill.exec.physical.impl.lateraljoin;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import org.apache.drill.PlanTestBase;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.test.ClientFixture;
import org.apache.drill.test.ClusterFixture;
@@ -30,10 +33,18 @@
import org.junit.Test;
import org.junit.Ignore;
+import java.nio.file.Paths;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
public class TestLateralPlans extends BaseTestQuery {
+ private static final String regularTestFile_1 = "cust_order_10_1.json";
+ private static final String regularTestFile_2 = "cust_order_10_2.json";
@BeforeClass
public static void enableUnnestLateral() throws Exception {
+ dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin",
"multipleFiles", regularTestFile_1));
+ dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin",
"multipleFiles", regularTestFile_2));
test("alter session set `planner.enable_unnest_lateral`=true");
}
@@ -255,7 +266,7 @@ public void testUnnestTableAndColumnAlias() throws
Exception {
.sql(Sql)
.run();
} catch (UserRemoteException ex) {
- assert(ex.getMessage().contains("Alias table and column name are
required for UNNEST"));
+ assertTrue(ex.getMessage().contains("Alias table and column name are
required for UNNEST"));
}
}
@@ -272,7 +283,156 @@ public void testUnnestColumnAlias() throws Exception {
.sql(Sql)
.run();
} catch (UserRemoteException ex) {
- assert(ex.getMessage().contains("Alias table and column name are
required for UNNEST"));
+ assertTrue(ex.getMessage().contains("Alias table and column name are
required for UNNEST"));
+ }
+ }
+
+
/***********************************************************************************************
+ Following test cases are introduced to make sure no exchanges are present
on right side of
+ Lateral join.
+
**********************************************************************************************/
+
+ @Test
+ public void testNoExchangeWithAggWithoutGrpBy() throws Exception {
+ String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles`
t," +
+ " lateral ( select sum(t2.ord.o_totalprice) as totalprice from
unnest(t.c_orders) t2(ord)) d1";
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+ .setOptionDefault(ExecConstants.SLICE_TARGET, 1);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String explain = client.queryBuilder().sql(Sql).explainText();
+ String rightChild = getRightChildOfLateral(explain);
+ assertFalse(rightChild.contains("Exchange"));
+ }
+ }
+
+ @Test
+ public void testNoExchangeWithStreamAggWithGrpBy() throws Exception {
+ String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles`
t," +
+ " lateral ( select sum(t2.ord.o_totalprice) as totalprice from
unnest(t.c_orders) t2(ord) group by t2.ord.o_orderkey) d1";
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+ .setOptionDefault(ExecConstants.SLICE_TARGET, 1)
+ .setOptionDefault(PlannerSettings.HASHAGG.getOptionName(), false)
+ .setOptionDefault(PlannerSettings.STREAMAGG.getOptionName(), true);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String explain = client.queryBuilder().sql(Sql).explainText();
+ String rightChild = getRightChildOfLateral(explain);
+ assertFalse(rightChild.contains("Exchange"));
}
}
+
+ @Test
+ public void testNoExchangeWithHashAggWithGrpBy() throws Exception {
+ String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles`
t," +
+ " lateral ( select sum(t2.ord.o_totalprice) as totalprice from
unnest(t.c_orders) t2(ord) group by t2.ord.o_orderkey) d1";
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+ .setOptionDefault(ExecConstants.SLICE_TARGET, 1)
+ .setOptionDefault(PlannerSettings.HASHAGG.getOptionName(), true)
+ .setOptionDefault(PlannerSettings.STREAMAGG.getOptionName(),
false);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String explain = client.queryBuilder().sql(Sql).explainText();
+ String rightChild = getRightChildOfLateral(explain);
+ assertFalse(rightChild.contains("Exchange"));
+ }
+ }
+
+ @Test
+ public void testNoExchangeWithOrderByWithoutLimit() throws Exception {
+ String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles`
t," +
+ " lateral ( select t2.ord.o_totalprice as totalprice from
unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey) d1";
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+ .setOptionDefault(ExecConstants.SLICE_TARGET, 1);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String explain = client.queryBuilder().sql(Sql).explainText();
+ String rightChild = getRightChildOfLateral(explain);
+ assertFalse(rightChild.contains("Exchange"));
+ }
+ }
+
+ @Test
+ public void testNoExchangeWithOrderByLimit() throws Exception {
+ String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles`
t," +
+ " lateral ( select t2.ord.o_totalprice as totalprice from
unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey limit 10) d1";
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+ .setOptionDefault(ExecConstants.SLICE_TARGET, 1);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String explain = client.queryBuilder().sql(Sql).explainText();
+ String rightChild = getRightChildOfLateral(explain);
+ assertFalse(rightChild.contains("Exchange"));
+ }
+ }
+
+
+ @Test
+ public void testNoExchangeWithLateralsDownStreamJoin() throws Exception {
+ String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` t,
dfs.`lateraljoin/multipleFiles` t2, " +
+ " lateral ( select t2.ord.o_totalprice as totalprice from
unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey limit 10) d1" +
+ " where t.c_name = t2.c_name";
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+ .setOptionDefault(ExecConstants.SLICE_TARGET, 1);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String explain = client.queryBuilder().sql(Sql).explainText();
+ String rightChild = getRightChildOfLateral(explain);
+ assertFalse(rightChild.contains("Exchange"));
+ }
+ }
+
+ @Test
+ public void testNoExchangeWithLateralsDownStreamUnion() throws Exception {
+ String Sql = "select t.c_name from dfs.`lateraljoin/multipleFiles` t union
all " +
+ " select t.c_name from dfs.`lateraljoin/multipleFiles` t, " +
+ " lateral ( select t2.ord.o_totalprice as totalprice from
unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey limit 10) d1";
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+ .setOptionDefault(ExecConstants.SLICE_TARGET, 1);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String explain = client.queryBuilder().sql(Sql).explainText();
+ String rightChild = getRightChildOfLateral(explain);
+ assertFalse(rightChild.contains("Exchange"));
+ }
+ }
+
+ @Test
+ public void testNoExchangeWithLateralsDownStreamAgg() throws Exception {
+ String Sql = "select sum(d1.totalprice) from
dfs.`lateraljoin/multipleFiles` t, " +
+ " lateral ( select t2.ord.o_totalprice as totalprice from
unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey limit 10) d1 group by
t.c_custkey";
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+ .setOptionDefault(ExecConstants.SLICE_TARGET, 1)
+ .setOptionDefault(PlannerSettings.HASHAGG.getOptionName(), false)
+ .setOptionDefault(PlannerSettings.STREAMAGG.getOptionName(), true);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String explain = client.queryBuilder().sql(Sql).explainText();
+ String rightChild = getRightChildOfLateral(explain);
+ assertFalse(rightChild.contains("Exchange"));
+ }
+ }
+
+ private String getRightChildOfLateral(String explain) throws Exception {
+ Matcher matcher = Pattern.compile("Correlate.*Unnest", Pattern.MULTILINE |
Pattern.DOTALL).matcher(explain);
+ assertTrue (matcher.find());
+ String CorrelateUnnest = matcher.group(0);
+ return CorrelateUnnest.substring(CorrelateUnnest.lastIndexOf("Scan"));
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Planner shouldn't create any exchanges on the right side of Lateral Join.
> -------------------------------------------------------------------------
>
> Key: DRILL-6456
> URL: https://issues.apache.org/jira/browse/DRILL-6456
> Project: Apache Drill
> Issue Type: Bug
> Components: Query Planning & Optimization
> Affects Versions: 1.14.0
> Reporter: Hanumath Rao Maduri
> Assignee: Hanumath Rao Maduri
> Priority: Major
> Labels: ready-to-commit
> Fix For: 1.14.0
>
>
> Currently, there is no restriction placed on right side of the LateralJoin.
> This is causing planner to generate an Exchange when there are operators like
> (Agg, Limit, Sort etc).
> Due to this unnest operator cannot retrieve the row from lateral's left side
> to process the pipeline further. Enhance the planner to not generate
> exchanges on the right side of the LateralJoin.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)