This is an automated email from the ASF dual-hosted git repository. sorabh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 0d48f6ba1e0c794fde2e03292c8c5c85115266b7 Author: Hanumath Rao Maduri <[email protected]> AuthorDate: Thu Jul 26 15:48:10 2018 -0700 DRILL-6636: Planner side changes to use PartitionLimitBatch in place of LimitBatch. --- .../drill/exec/planner/physical/LimitPrel.java | 27 ++++++++++++++++--- .../impl/lateraljoin/TestE2EUnnestAndLateral.java | 31 +++++++++++++--------- 2 files changed, 42 insertions(+), 16 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java index 5d3c6c6..057cfae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java @@ -17,9 +17,12 @@ */ package org.apache.drill.exec.planner.physical; +import org.apache.calcite.rel.RelWriter; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.Limit; +import org.apache.drill.exec.physical.config.PartitionLimit; import org.apache.drill.exec.planner.common.DrillLimitRelBase; +import org.apache.drill.exec.planner.common.DrillRelOptUtil; import org.apache.drill.exec.planner.physical.visitor.PrelVisitor; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.calcite.rel.RelNode; @@ -33,6 +36,7 @@ import java.util.Iterator; import java.util.List; public class LimitPrel extends DrillLimitRelBase implements Prel { + private boolean isPartitioned = false; public LimitPrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RexNode offset, RexNode fetch) { super(cluster, traitSet, child, offset, fetch); @@ -42,9 +46,14 @@ public class LimitPrel extends DrillLimitRelBase implements Prel { super(cluster, traitSet, child, offset, fetch, pushDown); } + public LimitPrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RexNode offset, RexNode fetch, boolean pushDown, boolean isPartitioned) { + super(cluster, traitSet, child, offset, fetch, pushDown); + this.isPartitioned = isPartitioned; + } + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { - return new LimitPrel(getCluster(), traitSet, sole(inputs), offset, fetch, isPushDown()); + return new LimitPrel(getCluster(), traitSet, sole(inputs), offset, fetch, isPushDown(), isPartitioned); } @Override @@ -60,7 +69,12 @@ public class LimitPrel extends DrillLimitRelBase implements Prel { // Null value implies including entire remaining result set from first offset Integer last = fetch != null ? Math.max(0, RexLiteral.intValue(fetch)) + first : null; - Limit limit = new Limit(childPOP, first, last); + Limit limit; + if (isPartitioned) { + limit = new PartitionLimit(childPOP, first, last, DrillRelOptUtil.IMPLICIT_COLUMN); + } else { + limit = new Limit(childPOP, first, last); + } return creator.addMetadata(this, limit); } @@ -75,6 +89,13 @@ public class LimitPrel extends DrillLimitRelBase implements Prel { } @Override + public RelWriter explainTerms(RelWriter pw) { + super.explainTerms(pw); + pw.itemIf("partitioned", isPartitioned, isPartitioned); + return pw; + } + + @Override public SelectionVectorMode[] getSupportedEncodings() { return SelectionVectorMode.NONE_AND_TWO; } @@ -91,6 +112,6 @@ public class LimitPrel extends DrillLimitRelBase implements Prel { @Override public Prel addImplicitRowIDCol(List<RelNode> children) { - return (Prel) this.copy(this.traitSet, children); + return new LimitPrel(this.getCluster(), this.traitSet, children.get(0), getOffset(), getFetch(), isPushDown(), true); } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java index e5775bb..a39d960 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java @@ -82,30 +82,34 @@ public class TestE2EUnnestAndLateral extends ClusterTest { } @Test - @Ignore ("DRILL-6635") public void testLateral_WithTopNInSubQuery() throws Exception { + runAndLog("alter session set `planner.enable_topn`=false"); + String Sql = "SELECT customer.c_name, orders.o_id, orders.o_amount " + "FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " + "(SELECT t.ord.o_id as o_id, t.ord.o_amount as o_amount FROM UNNEST(customer.orders) t(ord) ORDER BY " + "o_amount DESC LIMIT 1) orders"; - testBuilder() - .sqlQuery(Sql) - .unOrdered() - .baselineColumns("c_name", "o_id", "o_amount") - .baselineValues("customer1", 3.0, 294.5) - .baselineValues("customer2", 10.0, 724.5) - .baselineValues("customer3", 23.0, 772.2) - .baselineValues("customer4", 32.0, 1030.1) - .go(); + try { + testBuilder() + .sqlQuery(Sql) + .unOrdered() + .baselineColumns("c_name", "o_id", "o_amount") + .baselineValues("customer1", 3.0, 294.5) + .baselineValues("customer2", 10.0, 724.5) + .baselineValues("customer3", 23.0, 772.2) + .baselineValues("customer4", 32.0, 1030.1) + .go(); + } finally { + runAndLog("alter session set `planner.enable_topn`=true"); + } } /** - * Test which disables the TopN operator from planner settings before running query using SORT and LIMIT in + * Test which disables the TopN operator from planner settintestLateral_WithTopNInSubQuerygs before running query using SORT and LIMIT in * subquery. The same query as in above test is executed and same result is expected. */ @Test - @Ignore ("DRILL-6635") public void testLateral_WithSortAndLimitInSubQuery() throws Exception { runAndLog("alter session set `planner.enable_topn`=false"); @@ -291,8 +295,9 @@ public class TestE2EUnnestAndLateral extends ClusterTest { } @Test - @Ignore ("DRILL-6635") public void testMultipleBatchesLateral_WithTopNInSubQuery() throws Exception { + runAndLog("alter session set `planner.enable_topn`=false"); + String sql = "SELECT customer.c_name, orders.o_orderkey, orders.o_totalprice " + "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " + "(SELECT t.ord.o_orderkey as o_orderkey, t.ord.o_totalprice as o_totalprice FROM UNNEST(customer.c_orders) t(ord)" +
