This is an automated email from the ASF dual-hosted git repository. timothyfarkas pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 024fd8aa43e413bfa5a0f84f6c66b5402788b2bc Author: Hanumath Rao Maduri <[email protected]> AuthorDate: Wed Aug 1 15:41:29 2018 -0700 DRILL-6645: Transform TopN in Lateral Unnest pipeline to Sort and Limit. closes #1417 --- .../drill/exec/planner/physical/AggPrelBase.java | 2 +- .../apache/drill/exec/planner/physical/FilterPrel.java | 2 +- .../apache/drill/exec/planner/physical/LimitPrel.java | 2 +- .../org/apache/drill/exec/planner/physical/Prel.java | 8 +++++++- .../drill/exec/planner/physical/ProjectPrel.java | 2 +- .../planner/physical/SelectionVectorRemoverPrel.java | 2 +- .../apache/drill/exec/planner/physical/SortPrel.java | 2 +- .../apache/drill/exec/planner/physical/TopNPrel.java | 18 ++++++++++++++++-- .../apache/drill/exec/planner/physical/UnnestPrel.java | 2 +- .../physical/visitor/LateralUnnestRowIDVisitor.java | 6 +++--- 10 files changed, 33 insertions(+), 13 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java index a4f51f3..ca68a7d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java @@ -189,7 +189,7 @@ public abstract class AggPrelBase extends DrillAggregateRelBase implements Prel } @Override - public Prel addImplicitRowIDCol(List<RelNode> children) { + public Prel prepareForLateralUnnestPipeline(List<RelNode> children) { List<Integer> groupingCols = Lists.newArrayList(); groupingCols.add(0); for (int groupingCol : groupSet.asList()) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java index 1c9112c..33c2944 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java @@ -85,7 +85,7 @@ public class FilterPrel extends DrillFilterRelBase implements Prel { } @Override - public Prel addImplicitRowIDCol(List<RelNode> children) { + public Prel prepareForLateralUnnestPipeline(List<RelNode> children) { RexBuilder builder = this.getCluster().getRexBuilder(); // right shift the previous field indices. return (Prel) this.copy(this.traitSet, children.get(0), DrillRelOptUtil.transformExpr(builder, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java index 057cfae..ccbff17 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java @@ -111,7 +111,7 @@ public class LimitPrel extends DrillLimitRelBase implements Prel { } @Override - public Prel addImplicitRowIDCol(List<RelNode> children) { + public Prel prepareForLateralUnnestPipeline(List<RelNode> children) { return new LimitPrel(this.getCluster(), this.traitSet, children.get(0), getOffset(), getFetch(), isPushDown(), true); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java index b72aff7..01d8e9c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java @@ -56,7 +56,13 @@ public interface Prel extends DrillRelNode, Iterable<Prel> { SelectionVectorMode getEncoding(); boolean needsFinalColumnReordering(); - default Prel addImplicitRowIDCol(List<RelNode> children) { + /** + * If the operator is in Lateral/Unnest pipeline, then it generates a new operator which knows how to process + * the rows accordingly during execution. + * eg: TopNPrel -> SortPrel and LimitPrel + * Other operators like FilterPrel, ProjectPrel etc will add an implicit row id to the output. + */ + default Prel prepareForLateralUnnestPipeline(List<RelNode> children) { throw new UnsupportedOperationException("Adding Implicit RowID column is not supported for " + this.getClass().getSimpleName() + " operator "); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java index 0a9e8bf..4d5de20 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java @@ -136,7 +136,7 @@ public class ProjectPrel extends DrillProjectRelBase implements Prel{ } @Override - public Prel addImplicitRowIDCol(List<RelNode> children) { + public Prel prepareForLateralUnnestPipeline(List<RelNode> children) { RelDataTypeFactory typeFactory = this.getCluster().getTypeFactory(); RexBuilder builder = this.getCluster().getRexBuilder(); List<RexNode> projects = Lists.newArrayList(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java index a4cd921..3fad017 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java @@ -55,7 +55,7 @@ public class SelectionVectorRemoverPrel extends SinglePrel{ } @Override - public Prel addImplicitRowIDCol(List<RelNode> children) { + public Prel prepareForLateralUnnestPipeline(List<RelNode> children) { return (Prel) this.copy(this.traitSet, children); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java index 686e04a..aa7158a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java @@ -124,7 +124,7 @@ public class SortPrel extends org.apache.calcite.rel.core.Sort implements Prel { } @Override - public Prel addImplicitRowIDCol(List<RelNode> children) { + public Prel prepareForLateralUnnestPipeline(List<RelNode> children) { List<RelFieldCollation> relFieldCollations = Lists.newArrayList(); relFieldCollations.add(new RelFieldCollation(0, RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java index 9bdcad0..3e407f7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java @@ -18,11 +18,14 @@ package org.apache.drill.exec.planner.physical; import java.io.IOException; +import java.math.BigDecimal; import java.util.List; import com.google.common.collect.Lists; import org.apache.calcite.rel.RelCollationImpl; import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.TopN; import org.apache.drill.exec.planner.cost.DrillCostBase; @@ -101,7 +104,7 @@ public class TopNPrel extends SinglePrel { } @Override - public Prel addImplicitRowIDCol(List<RelNode> children) { + public Prel prepareForLateralUnnestPipeline(List<RelNode> children) { List<RelFieldCollation> relFieldCollations = Lists.newArrayList(); relFieldCollations.add(new RelFieldCollation(0, RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)); @@ -115,6 +118,17 @@ public class TopNPrel extends SinglePrel { .replace(this.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE)) .replace(collationTrait) .replace(DRILL_PHYSICAL); - return (Prel) this.copy(traits, children); + return transformTopNToSortAndLimit(children, traits, collationTrait); + } + + private Prel transformTopNToSortAndLimit(List<RelNode> children, RelTraitSet traits, RelCollation collationTrait) { + SortPrel sortprel = new SortPrel(this.getCluster(), traits, children.get(0), collationTrait); + RexNode offset = this.getCluster().getRexBuilder().makeExactLiteral(BigDecimal.valueOf(0), + this.getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER)); + RexNode limit = this.getCluster().getRexBuilder().makeExactLiteral(BigDecimal.valueOf(this.limit), + this.getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER)); + //SMEX is not needed here because Lateral/Unnest pipeline doesn't support exchanges. + LimitPrel limitPrel = new LimitPrel(this.getCluster(), traits, sortprel, offset, limit, false, true); + return limitPrel; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java index 2331138..274f27a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java @@ -86,7 +86,7 @@ public class UnnestPrel extends DrillUnnestRelBase implements Prel { } @Override - public Prel addImplicitRowIDCol(List<RelNode> children) { + public Prel prepareForLateralUnnestPipeline(List<RelNode> children) { RelDataTypeFactory typeFactory = this.getCluster().getTypeFactory(); List<String> fieldNames = new ArrayList<>(); List<RelDataType> fieldTypes = new ArrayList<>(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/LateralUnnestRowIDVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/LateralUnnestRowIDVisitor.java index 4692202..dc4af5b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/LateralUnnestRowIDVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/LateralUnnestRowIDVisitor.java @@ -43,7 +43,7 @@ public class LateralUnnestRowIDVisitor extends BasePrelVisitor<Prel, Boolean, Ru public Prel visitPrel(Prel prel, Boolean isRightOfLateral) throws RuntimeException { List<RelNode> children = getChildren(prel, isRightOfLateral); if (isRightOfLateral) { - return prel.addImplicitRowIDCol(children); + return prel.prepareForLateralUnnestPipeline(children); } else { return (Prel) prel.copy(prel.getTraitSet(), children); } @@ -61,7 +61,7 @@ public class LateralUnnestRowIDVisitor extends BasePrelVisitor<Prel, Boolean, Ru @Override public Prel visitLateral(LateralJoinPrel prel, Boolean value) throws RuntimeException { List<RelNode> children = Lists.newArrayList(); - children.add(((Prel)prel.getInput(0)).accept(this, false)); + children.add(((Prel)prel.getInput(0)).accept(this, value)); children.add(((Prel) prel.getInput(1)).accept(this, true)); return (Prel) prel.copy(prel.getTraitSet(), children); @@ -69,6 +69,6 @@ public class LateralUnnestRowIDVisitor extends BasePrelVisitor<Prel, Boolean, Ru @Override public Prel visitUnnest(UnnestPrel prel, Boolean value) throws RuntimeException { - return prel.addImplicitRowIDCol(null); + return prel.prepareForLateralUnnestPipeline(null); } }
