[ 
https://issues.apache.org/jira/browse/DRILL-6456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16498874#comment-16498874
 ] 

ASF GitHub Bot commented on DRILL-6456:
---------------------------------------

parthchandra closed pull request #1299: DRILL-6456: Planner shouldn't create 
any exchanges on the right side …
URL: https://github.com/apache/drill/pull/1299
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
index 7bfe21483a..b4ed5e0e63 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
@@ -19,8 +19,8 @@
 
 import java.util.Collections;
 import java.util.List;
-
 import org.apache.drill.exec.planner.fragment.DistributionAffinity;
+import org.apache.drill.exec.planner.physical.CorrelatePrel;
 import org.apache.drill.exec.planner.physical.ExchangePrel;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
@@ -28,9 +28,11 @@
 import org.apache.calcite.rel.RelNode;
 
 import com.google.common.collect.Lists;
+import org.apache.drill.exec.planner.physical.UnnestPrel;
 
 public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, 
ExcessiveExchangeIdentifier.MajorFragmentStat, RuntimeException> {
   private final long targetSliceSize;
+  private CorrelatePrel topMostLateralJoin = null;
 
   public ExcessiveExchangeIdentifier(long targetSliceSize) {
     this.targetSliceSize = targetSliceSize;
@@ -45,18 +47,28 @@ public static Prel removeExcessiveEchanges(Prel prel, long 
targetSliceSize) {
   public Prel visitExchange(ExchangePrel prel, MajorFragmentStat parent) 
throws RuntimeException {
     parent.add(prel);
     MajorFragmentStat newFrag = new MajorFragmentStat();
+    newFrag.setRightSideOfLateral(parent.isRightSideOfLateral());
     Prel newChild = ((Prel) prel.getInput()).accept(this, newFrag);
-
-    if (newFrag.isSingular() && parent.isSingular() &&
-        // if one of them has strict distribution or none, we can remove the 
exchange
-        (!newFrag.isDistributionStrict() || !parent.isDistributionStrict())
-        ) {
+    if (canRemoveExchange(parent, newFrag)) {
       return newChild;
     } else {
       return (Prel) prel.copy(prel.getTraitSet(), 
Collections.singletonList((RelNode) newChild));
     }
   }
 
+  private boolean canRemoveExchange(MajorFragmentStat parentFrag, 
MajorFragmentStat childFrag) {
+    if (childFrag.isSingular() && parentFrag.isSingular() &&
+       (!childFrag.isDistributionStrict() || 
!parentFrag.isDistributionStrict())) {
+      return true;
+    }
+
+    if (parentFrag.isRightSideOfLateral()) {
+      return true;
+    }
+
+    return false;
+  }
+
   @Override
   public Prel visitScreen(ScreenPrel prel, MajorFragmentStat s) throws 
RuntimeException {
     s.addScreen(prel);
@@ -70,6 +82,40 @@ public Prel visitScan(ScanPrel prel, MajorFragmentStat s) 
throws RuntimeExceptio
     return prel;
   }
 
+  @Override
+  public Prel visitCorrelate(CorrelatePrel prel, MajorFragmentStat s) throws 
RuntimeException {
+    List<RelNode> children = Lists.newArrayList();
+    s.add(prel);
+
+    for (Prel p : prel) {
+      s.add(p);
+    }
+
+    // Traverse the left side of the Lateral join first. Left side of the
+    // Lateral shouldn't have any restrictions on Exchanges.
+    children.add(((Prel)prel.getInput(0)).accept(this, s));
+    // Save the outermost Lateral join so as to unset the flag later.
+    if (topMostLateralJoin == null) {
+      topMostLateralJoin = prel;
+    }
+
+    // Right side of the Lateral shouldn't have any Exchanges. Hence set the
+    // flag so that visitExchange removes the exchanges.
+    s.setRightSideOfLateral(true);
+    children.add(((Prel)prel.getInput(1)).accept(this, s));
+    if (topMostLateralJoin == prel) {
+      topMostLateralJoin = null;
+      s.setRightSideOfLateral(false);
+    }
+    return (Prel) prel.copy(prel.getTraitSet(), children);
+  }
+
+  @Override
+  public Prel visitUnnest(UnnestPrel prel, MajorFragmentStat s) throws 
RuntimeException {
+    s.addUnnest(prel);
+    return prel;
+  }
+
   @Override
   public Prel visitPrel(Prel prel, MajorFragmentStat s) throws 
RuntimeException {
     List<RelNode> children = Lists.newArrayList();
@@ -98,6 +144,7 @@ public MajorFragmentStat getNewStat() {
     private double maxRows = 0d;
     private int maxWidth = Integer.MAX_VALUE;
     private boolean isMultiSubScan = false;
+    private boolean rightSideOfLateral = false;
 
     public void add(Prel prel) {
       maxRows = 
Math.max(prel.estimateRowCount(prel.getCluster().getMetadataQuery()), maxRows);
@@ -130,9 +177,20 @@ public boolean isSingular() {
       return w == 1;
     }
 
+    public boolean isRightSideOfLateral() {
+      return this.rightSideOfLateral;
+    }
+
+    public void addUnnest(UnnestPrel prel) {
+      add(prel);
+    }
+
+    public void setRightSideOfLateral(boolean rightSideOfLateral) {
+      this.rightSideOfLateral = rightSideOfLateral;
+    }
+
     public boolean isDistributionStrict() {
       return distributionAffinity == DistributionAffinity.HARD;
     }
   }
-
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
index 9e19729f6d..00ab971e15 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
@@ -18,10 +18,13 @@
 package org.apache.drill.exec.physical.impl.lateraljoin;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import org.apache.drill.PlanTestBase;
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.test.ClientFixture;
 import org.apache.drill.test.ClusterFixture;
@@ -30,10 +33,18 @@
 import org.junit.Test;
 import org.junit.Ignore;
 
+import java.nio.file.Paths;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 public class TestLateralPlans extends BaseTestQuery {
+  private static final String regularTestFile_1 = "cust_order_10_1.json";
+  private static final String regularTestFile_2 = "cust_order_10_2.json";
 
   @BeforeClass
   public static void enableUnnestLateral() throws Exception {
+    dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", 
"multipleFiles", regularTestFile_1));
+    dirTestWatcher.copyResourceToRoot(Paths.get("lateraljoin", 
"multipleFiles", regularTestFile_2));
     test("alter session set `planner.enable_unnest_lateral`=true");
   }
 
@@ -255,7 +266,7 @@ public void testUnnestTableAndColumnAlias() throws 
Exception {
           .sql(Sql)
           .run();
     } catch (UserRemoteException ex) {
-      assert(ex.getMessage().contains("Alias table and column name are 
required for UNNEST"));
+      assertTrue(ex.getMessage().contains("Alias table and column name are 
required for UNNEST"));
     }
   }
 
@@ -272,7 +283,156 @@ public void testUnnestColumnAlias() throws Exception {
           .sql(Sql)
           .run();
     } catch (UserRemoteException ex) {
-      assert(ex.getMessage().contains("Alias table and column name are 
required for UNNEST"));
+      assertTrue(ex.getMessage().contains("Alias table and column name are 
required for UNNEST"));
+    }
+  }
+
+  
/***********************************************************************************************
+   Following test cases are introduced to make sure no exchanges are present 
on right side of
+   Lateral join.
+   
**********************************************************************************************/
+
+  @Test
+  public void testNoExchangeWithAggWithoutGrpBy() throws Exception {
+    String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` 
t," +
+            " lateral ( select sum(t2.ord.o_totalprice) as totalprice from 
unnest(t.c_orders) t2(ord)) d1";
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+            .setOptionDefault(ExecConstants.SLICE_TARGET, 1);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String explain = client.queryBuilder().sql(Sql).explainText();
+      String rightChild = getRightChildOfLateral(explain);
+      assertFalse(rightChild.contains("Exchange"));
+    }
+  }
+
+  @Test
+  public void testNoExchangeWithStreamAggWithGrpBy() throws Exception {
+    String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` 
t," +
+            " lateral ( select sum(t2.ord.o_totalprice) as totalprice from 
unnest(t.c_orders) t2(ord) group by t2.ord.o_orderkey) d1";
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+            .setOptionDefault(ExecConstants.SLICE_TARGET, 1)
+            .setOptionDefault(PlannerSettings.HASHAGG.getOptionName(), false)
+            .setOptionDefault(PlannerSettings.STREAMAGG.getOptionName(), true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String explain = client.queryBuilder().sql(Sql).explainText();
+      String rightChild = getRightChildOfLateral(explain);
+      assertFalse(rightChild.contains("Exchange"));
     }
   }
+
+  @Test
+  public void testNoExchangeWithHashAggWithGrpBy() throws Exception {
+    String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` 
t," +
+            " lateral ( select sum(t2.ord.o_totalprice) as totalprice from 
unnest(t.c_orders) t2(ord) group by t2.ord.o_orderkey) d1";
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+            .setOptionDefault(ExecConstants.SLICE_TARGET, 1)
+            .setOptionDefault(PlannerSettings.HASHAGG.getOptionName(), true)
+            .setOptionDefault(PlannerSettings.STREAMAGG.getOptionName(), 
false);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String explain = client.queryBuilder().sql(Sql).explainText();
+      String rightChild = getRightChildOfLateral(explain);
+      assertFalse(rightChild.contains("Exchange"));
+    }
+  }
+
+  @Test
+  public void testNoExchangeWithOrderByWithoutLimit() throws Exception {
+    String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` 
t," +
+            " lateral ( select t2.ord.o_totalprice as totalprice from 
unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey) d1";
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+            .setOptionDefault(ExecConstants.SLICE_TARGET, 1);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String explain = client.queryBuilder().sql(Sql).explainText();
+      String rightChild = getRightChildOfLateral(explain);
+      assertFalse(rightChild.contains("Exchange"));
+    }
+  }
+
+  @Test
+  public void testNoExchangeWithOrderByLimit() throws Exception {
+    String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` 
t," +
+            " lateral ( select t2.ord.o_totalprice as totalprice from 
unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey limit 10) d1";
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+            .setOptionDefault(ExecConstants.SLICE_TARGET, 1);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String explain = client.queryBuilder().sql(Sql).explainText();
+      String rightChild = getRightChildOfLateral(explain);
+      assertFalse(rightChild.contains("Exchange"));
+    }
+  }
+
+
+  @Test
+  public void testNoExchangeWithLateralsDownStreamJoin() throws Exception {
+    String Sql = "select d1.totalprice from dfs.`lateraljoin/multipleFiles` t, 
dfs.`lateraljoin/multipleFiles` t2, " +
+            " lateral ( select t2.ord.o_totalprice as totalprice from 
unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey limit 10) d1" +
+            " where t.c_name = t2.c_name";
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+            .setOptionDefault(ExecConstants.SLICE_TARGET, 1);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String explain = client.queryBuilder().sql(Sql).explainText();
+      String rightChild = getRightChildOfLateral(explain);
+      assertFalse(rightChild.contains("Exchange"));
+    }
+  }
+
+  @Test
+  public void testNoExchangeWithLateralsDownStreamUnion() throws Exception {
+    String Sql = "select t.c_name from dfs.`lateraljoin/multipleFiles` t union 
all " +
+            " select t.c_name from dfs.`lateraljoin/multipleFiles` t, " +
+                    " lateral ( select t2.ord.o_totalprice as totalprice from 
unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey limit 10) d1";
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+            .setOptionDefault(ExecConstants.SLICE_TARGET, 1);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String explain = client.queryBuilder().sql(Sql).explainText();
+      String rightChild = getRightChildOfLateral(explain);
+      assertFalse(rightChild.contains("Exchange"));
+    }
+  }
+
+  @Test
+  public void testNoExchangeWithLateralsDownStreamAgg() throws Exception {
+    String Sql = "select sum(d1.totalprice) from 
dfs.`lateraljoin/multipleFiles` t, " +
+            " lateral ( select t2.ord.o_totalprice as totalprice from 
unnest(t.c_orders) t2(ord) order by t2.ord.o_orderkey limit 10) d1 group by 
t.c_custkey";
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+            .setOptionDefault(ExecConstants.ENABLE_UNNEST_LATERAL_KEY, true)
+            .setOptionDefault(ExecConstants.SLICE_TARGET, 1)
+            .setOptionDefault(PlannerSettings.HASHAGG.getOptionName(), false)
+            .setOptionDefault(PlannerSettings.STREAMAGG.getOptionName(), true);
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      String explain = client.queryBuilder().sql(Sql).explainText();
+      String rightChild = getRightChildOfLateral(explain);
+      assertFalse(rightChild.contains("Exchange"));
+    }
+  }
+
+  private String getRightChildOfLateral(String explain) throws Exception {
+    Matcher matcher = Pattern.compile("Correlate.*Unnest", Pattern.MULTILINE | 
Pattern.DOTALL).matcher(explain);
+    assertTrue (matcher.find());
+    String CorrelateUnnest = matcher.group(0);
+    return CorrelateUnnest.substring(CorrelateUnnest.lastIndexOf("Scan"));
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Planner shouldn't create any exchanges on the right side of Lateral Join.
> -------------------------------------------------------------------------
>
>                 Key: DRILL-6456
>                 URL: https://issues.apache.org/jira/browse/DRILL-6456
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Query Planning &amp; Optimization
>    Affects Versions: 1.14.0
>            Reporter: Hanumath Rao Maduri
>            Assignee: Hanumath Rao Maduri
>            Priority: Major
>              Labels: ready-to-commit
>             Fix For: 1.14.0
>
>
> Currently, there is no restriction placed on right side of the LateralJoin. 
> This is causing planner to generate an Exchange when there are operators like 
> (Agg, Limit, Sort etc). 
> Due to this unnest operator cannot retrieve the row from lateral's left side 
> to process the pipeline further. Enhance the planner to not generate 
> exchanges on the right side of the LateralJoin.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to