Refactor trait pull up to common SubsetTransformer. Update Prules to use new class and update FilterPrule to use all instead of best to work with Optiq 0.9.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/913fad85 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/913fad85 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/913fad85 Branch: refs/heads/master Commit: 913fad858bbb751cde47b15a2cffda7f4797bcad Parents: 0cbf6ad Author: Jacques Nadeau <jacq...@apache.org> Authored: Sat Jul 26 21:36:53 2014 -0700 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Sat Jul 26 21:41:10 2014 -0700 ---------------------------------------------------------------------- .../exec/planner/physical/FilterPrule.java | 31 +++--- .../exec/planner/physical/HashAggPrule.java | 64 ++++++----- .../exec/planner/physical/JoinPruleBase.java | 82 +++++++------- .../exec/planner/physical/ProjectPrule.java | 39 +++---- .../exec/planner/physical/StreamAggPrule.java | 106 +++++++++---------- .../planner/physical/SubsetTransformer.java | 69 ++++++++++++ .../exec/planner/physical/WriterPrule.java | 34 +++--- .../java/org/apache/drill/TestTpchExplain.java | 6 ++ 8 files changed, 257 insertions(+), 174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java index e72a780..c15c5e0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java @@ -19,13 +19,10 @@ package org.apache.drill.exec.planner.physical; import org.apache.drill.exec.planner.logical.DrillFilterRel; import org.apache.drill.exec.planner.logical.RelOptHelper; -import org.eigenbase.rel.RelCollation; -import org.eigenbase.rel.RelCollationTraitDef; import org.eigenbase.rel.RelNode; import org.eigenbase.relopt.RelOptRule; import org.eigenbase.relopt.RelOptRuleCall; import org.eigenbase.relopt.RelTraitSet; -import org.eigenbase.relopt.volcano.RelSubset; public class FilterPrule extends Prule { public static final RelOptRule INSTANCE = new FilterPrule(); @@ -41,19 +38,25 @@ public class FilterPrule extends Prule { RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL); RelNode convertedInput = convert(input, traits); - boolean transform = false; - - if (convertedInput instanceof RelSubset) { - RelSubset subset = (RelSubset) convertedInput; - RelNode bestRel = null; - if ((bestRel = subset.getBest()) != null) { - call.transformTo(new FilterPrel(filter.getCluster(), bestRel.getTraitSet(), convertedInput, filter.getCondition())); - transform = true; - } - } + + boolean transform = new Subset(call).go(filter, convertedInput); + if (!transform) { call.transformTo(new FilterPrel(filter.getCluster(), convertedInput.getTraitSet(), convertedInput, filter.getCondition())); } } - + + + private class Subset extends SubsetTransformer<DrillFilterRel, RuntimeException> { + + public Subset(RelOptRuleCall call) { + super(call); + } + + @Override + public RelNode convertChild(DrillFilterRel filter, RelNode rel) { + return new FilterPrel(filter.getCluster(), rel.getTraitSet(), rel, filter.getCondition()); + } + + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java index d8b2338..4d42f66 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java @@ -26,8 +26,8 @@ import org.eigenbase.rel.InvalidRelException; import org.eigenbase.rel.RelNode; import org.eigenbase.relopt.RelOptRule; import org.eigenbase.relopt.RelOptRuleCall; +import org.eigenbase.relopt.RelTrait; import org.eigenbase.relopt.RelTraitSet; -import org.eigenbase.relopt.volcano.RelSubset; import org.eigenbase.trace.EigenbaseTrace; import com.google.common.collect.ImmutableList; @@ -84,34 +84,8 @@ public class HashAggPrule extends AggPruleBase { traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ; RelNode convertedInput = convert(input, traits); + new TwoPhaseSubset(call, distOnAllKeys).go(aggregate, convertedInput); - if (convertedInput instanceof RelSubset) { - RelSubset subset = (RelSubset) convertedInput; - for (RelNode rel : subset.getRelList()) { - if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) { - DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE); - traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist); - RelNode newInput = convert(input, traits); - - HashAggPrel phase1Agg = new HashAggPrel(aggregate.getCluster(), traits, newInput, - aggregate.getGroupSet(), - aggregate.getAggCallList(), - OperatorPhase.PHASE_1of2); - - HashToRandomExchangePrel exch = - new HashToRandomExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys), - phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true))); - - HashAggPrel phase2Agg = new HashAggPrel(aggregate.getCluster(), traits, exch, - aggregate.getGroupSet(), - phase1Agg.getPhase2AggCalls(), - OperatorPhase.PHASE_2of2); - - - call.transformTo(phase2Agg); - } - } - } } } } catch (InvalidRelException e) { @@ -119,6 +93,40 @@ public class HashAggPrule extends AggPruleBase { } } + + private class TwoPhaseSubset extends SubsetTransformer<DrillAggregateRel, InvalidRelException> { + final RelTrait distOnAllKeys; + + public TwoPhaseSubset(RelOptRuleCall call, RelTrait distOnAllKeys) { + super(call); + this.distOnAllKeys = distOnAllKeys; + } + + @Override + public RelNode convertChild(DrillAggregateRel aggregate, RelNode input) throws InvalidRelException { + + RelTraitSet traits = newTraitSet(Prel.DRILL_PHYSICAL, input.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE)); + RelNode newInput = convert(input, traits); + + HashAggPrel phase1Agg = new HashAggPrel(aggregate.getCluster(), traits, newInput, + aggregate.getGroupSet(), + aggregate.getAggCallList(), + OperatorPhase.PHASE_1of2); + + HashToRandomExchangePrel exch = + new HashToRandomExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys), + phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true))); + + HashAggPrel phase2Agg = new HashAggPrel(aggregate.getCluster(), traits, exch, + aggregate.getGroupSet(), + phase1Agg.getPhase2AggCalls(), + OperatorPhase.PHASE_2of2); + + return phase2Agg; + } + + } + private void createTransformRequest(RelOptRuleCall call, DrillAggregateRel aggregate, RelNode input, RelTraitSet traits) throws InvalidRelException { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java index 336e34c..d6bd711 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java @@ -92,39 +92,39 @@ public abstract class JoinPruleBase extends Prule { PhysicalJoinType physicalJoinType, RelNode left, RelNode right, RelCollation collationLeft, RelCollation collationRight, boolean hashSingleKey)throws InvalidRelException { - - /* If join keys are l1 = r1 and l2 = r2 and ... l_k = r_k, then consider the following options of plan: + + /* If join keys are l1 = r1 and l2 = r2 and ... l_k = r_k, then consider the following options of plan: * 1) Plan1: distributed by (l1, l2, ..., l_k) for left side and by (r1, r2, ..., r_k) for right side. * 2) Plan2: distributed by l1 for left side, by r1 for right side. * 3) Plan3: distributed by l2 for left side, by r2 for right side. * ... * Plan_(k+1): distributed by l_k for left side, by r_k by right side. - * - * Whether enumerate plan 2, .., Plan_(k+1) depends on option : hashSingleKey. + * + * Whether enumerate plan 2, .., Plan_(k+1) depends on option : hashSingleKey. */ - + DrillDistributionTrait hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getLeftKeys()))); DrillDistributionTrait hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getRightKeys()))); - + createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition); - + assert (join.getLeftKeys().size() == join.getRightKeys().size()); - + if (!hashSingleKey) return; - + int numJoinKeys = join.getLeftKeys().size(); if (numJoinKeys > 1) { for (int i = 0; i< numJoinKeys; i++) { hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getLeftKeys().subList(i, i+1)))); hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getRightKeys().subList(i, i+1)))); - + createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition); } } } - + // Create join plan with both left and right children hash distributed. If the physical join type // is MergeJoin, a collation must be provided for both left and right child and the plan will contain // sort converter if necessary to provide the collation. @@ -170,9 +170,9 @@ public abstract class JoinPruleBase extends Prule { // is MergeJoin, a collation must be provided for both left and right child and the plan will contain sort converter // if necessary to provide the collation. protected void createBroadcastPlan(RelOptRuleCall call, DrillJoinRel join, - PhysicalJoinType physicalJoinType, - RelNode left, RelNode right, - RelCollation collationLeft, RelCollation collationRight) throws InvalidRelException { + final PhysicalJoinType physicalJoinType, + final RelNode left, final RelNode right, + final RelCollation collationLeft, final RelCollation collationRight) throws InvalidRelException { DrillDistributionTrait distBroadcastRight = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED); RelTraitSet traitsRight = null; @@ -183,37 +183,35 @@ public abstract class JoinPruleBase extends Prule { traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distBroadcastRight); } - RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL); - RelNode convertedLeft = convert(left, traitsLeft); - RelNode convertedRight = convert(right, traitsRight); + final RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL); + final RelNode convertedLeft = convert(left, traitsLeft); + final RelNode convertedRight = convert(right, traitsRight); - traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL); + new SubsetTransformer<DrillJoinRel, InvalidRelException>(call){ - DrillJoinRelBase newJoin = null; - - if (convertedLeft instanceof RelSubset) { - RelSubset subset = (RelSubset) convertedLeft; - for (RelNode rel : subset.getRelList()) { - if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) { - DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE); - if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) { - traitsLeft = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationLeft).plus(toDist); - } else { - traitsLeft = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist); - } - - RelNode newLeft = convert(left, traitsLeft); - if (physicalJoinType == PhysicalJoinType.HASH_JOIN) { - newJoin = new HashJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight, join.getCondition(), - join.getJoinType()); - } else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) { - newJoin = new MergeJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight, join.getCondition(), - join.getJoinType()); - } - call.transformTo(newJoin) ; + public RelNode convertChild(final DrillJoinRel join, final RelNode rel) throws InvalidRelException { + DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE); + RelTraitSet newTraitsLeft; + if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) { + newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, collationLeft, toDist); + } else { + newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, toDist); + } + Character.digit(1, 1); + RelNode newLeft = convert(left, newTraitsLeft); + if (physicalJoinType == PhysicalJoinType.HASH_JOIN) { + return new HashJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight, join.getCondition(), + join.getJoinType()); + } else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) { + return new MergeJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight, join.getCondition(), + join.getJoinType()); + } else{ + return null; } + } - } - } + }.go(join, convertedLeft); + + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java index 02e6d44..833aaae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java @@ -23,19 +23,15 @@ import java.util.Map; import net.hydromatic.linq4j.Ord; -import org.apache.drill.exec.planner.common.DrillProjectRelBase; import org.apache.drill.exec.planner.logical.DrillProjectRel; -import org.apache.drill.exec.planner.logical.DrillRel; import org.apache.drill.exec.planner.logical.RelOptHelper; import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField; import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionType; -import org.eigenbase.rel.ProjectRel; import org.eigenbase.rel.RelCollation; import org.eigenbase.rel.RelCollationImpl; import org.eigenbase.rel.RelCollationTraitDef; import org.eigenbase.rel.RelFieldCollation; import org.eigenbase.rel.RelNode; -import org.eigenbase.relopt.Convention; import org.eigenbase.relopt.RelOptRule; import org.eigenbase.relopt.RelOptRuleCall; import org.eigenbase.relopt.RelTraitSet; @@ -64,26 +60,33 @@ public class ProjectPrule extends Prule { RelNode convertedInput = convert(input, traits); Map<Integer, Integer> inToOut = getProjectMap(project); + boolean traitPull = new ProjectTraitPull(call, inToOut).go(project, convertedInput); - if (convertedInput instanceof RelSubset) { - RelSubset subset = (RelSubset) convertedInput; - for (RelNode rel : subset.getRelList()) { - if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) { - DrillDistributionTrait childDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE); - RelCollation childCollation = rel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE); + if(!traitPull){ + call.transformTo(new ProjectPrel(project.getCluster(), convertedInput.getTraitSet(), convertedInput, project.getProjects(), project.getRowType())); + } + } + private class ProjectTraitPull extends SubsetTransformer<DrillProjectRel, RuntimeException> { + final Map<Integer, Integer> inToOut; - DrillDistributionTrait newDist = convertDist(childDist, inToOut); - RelCollation newCollation = convertRelCollation(childCollation, inToOut); + public ProjectTraitPull(RelOptRuleCall call, Map<Integer, Integer> inToOut) { + super(call); + this.inToOut = inToOut; + } - call.transformTo(new ProjectPrel(project.getCluster(), project.getTraitSet().plus(newDist).plus(newCollation).plus(Prel.DRILL_PHYSICAL), - rel, project.getProjects(), project.getRowType())); - } - } + @Override + public RelNode convertChild(DrillProjectRel project, RelNode rel) throws RuntimeException { + DrillDistributionTrait childDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE); + RelCollation childCollation = rel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE); - } else{ - call.transformTo(new ProjectPrel(project.getCluster(), convertedInput.getTraitSet(), convertedInput, project.getProjects(), project.getRowType())); + + DrillDistributionTrait newDist = convertDist(childDist, inToOut); + RelCollation newCollation = convertRelCollation(childCollation, inToOut); + RelTraitSet newProjectTraits = rel.getTraitSet().plus(newDist).plus(newCollation); + return new ProjectPrel(project.getCluster(), newProjectTraits, rel, project.getProjects(), project.getRowType()); } + } private DrillDistributionTrait convertDist(DrillDistributionTrait srcDist, Map<Integer, Integer> inToOut) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java index 0375161..4191184 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java @@ -23,6 +23,7 @@ import java.util.logging.Logger; import net.hydromatic.optiq.util.BitSets; import org.apache.drill.exec.planner.logical.DrillAggregateRel; +import org.apache.drill.exec.planner.logical.DrillJoinRel; import org.apache.drill.exec.planner.logical.RelOptHelper; import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase; import org.eigenbase.rel.InvalidRelException; @@ -56,7 +57,7 @@ public class StreamAggPrule extends AggPruleBase { public void onMatch(RelOptRuleCall call) { final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0); final RelNode input = aggregate.getChild(); - RelCollation collation = getCollation(aggregate); + final RelCollation collation = getCollation(aggregate); RelTraitSet traits = null; if (aggregate.containsDistinctCall()) { @@ -67,44 +68,40 @@ public class StreamAggPrule extends AggPruleBase { try { if (aggregate.getGroupSet().isEmpty()) { DrillDistributionTrait singleDist = DrillDistributionTrait.SINGLETON; - RelTraitSet singleDistTrait = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(singleDist); + final RelTraitSet singleDistTrait = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(singleDist); if (create2PhasePlan(call, aggregate)) { traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ; RelNode convertedInput = convert(input, traits); + new SubsetTransformer<DrillAggregateRel, InvalidRelException>(call){ - if (convertedInput instanceof RelSubset) { - RelSubset subset = (RelSubset) convertedInput; - for (RelNode rel : subset.getRelList()) { - if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) { - DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE); - traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist); - RelNode newInput = convert(input, traits); - - StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput, - aggregate.getGroupSet(), - aggregate.getAggCallList(), - OperatorPhase.PHASE_1of2); - - UnionExchangePrel exch = - new UnionExchangePrel(phase1Agg.getCluster(), singleDistTrait, phase1Agg); - - StreamAggPrel phase2Agg = new StreamAggPrel(aggregate.getCluster(), singleDistTrait, exch, - aggregate.getGroupSet(), - phase1Agg.getPhase2AggCalls(), - OperatorPhase.PHASE_2of2); - - call.transformTo(phase2Agg); - } + public RelNode convertChild(final DrillAggregateRel join, final RelNode rel) throws InvalidRelException { + DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE); + RelTraitSet traits = newTraitSet(Prel.DRILL_PHYSICAL, toDist); + RelNode newInput = convert(input, traits); + + StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput, + aggregate.getGroupSet(), + aggregate.getAggCallList(), + OperatorPhase.PHASE_1of2); + + UnionExchangePrel exch = + new UnionExchangePrel(phase1Agg.getCluster(), singleDistTrait, phase1Agg); + + return new StreamAggPrel(aggregate.getCluster(), singleDistTrait, exch, + aggregate.getGroupSet(), + phase1Agg.getPhase2AggCalls(), + OperatorPhase.PHASE_2of2); } - } + }.go(aggregate, convertedInput); + } else { createTransformRequest(call, aggregate, input, singleDistTrait); } } else { // hash distribute on all grouping keys - DrillDistributionTrait distOnAllKeys = + final DrillDistributionTrait distOnAllKeys = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(aggregate, true))); @@ -126,39 +123,34 @@ public class StreamAggPrule extends AggPruleBase { if (create2PhasePlan(call, aggregate)) { traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ; - RelNode convertedInput = convert(input, traits); - if (convertedInput instanceof RelSubset) { - RelSubset subset = (RelSubset) convertedInput; - for (RelNode rel : subset.getRelList()) { - if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) { - DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE); - traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(toDist); - RelNode newInput = convert(input, traits); - - StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput, - aggregate.getGroupSet(), - aggregate.getAggCallList(), - OperatorPhase.PHASE_1of2); - - int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints(); - - HashToMergeExchangePrel exch = - new HashToMergeExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys), - phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)), - collation, - numEndPoints); - - StreamAggPrel phase2Agg = new StreamAggPrel(aggregate.getCluster(), traits, exch, - aggregate.getGroupSet(), - phase1Agg.getPhase2AggCalls(), - OperatorPhase.PHASE_2of2); - - call.transformTo(phase2Agg); - } + new SubsetTransformer<DrillAggregateRel, InvalidRelException>(call){ + + public RelNode convertChild(final DrillAggregateRel aggregate, final RelNode rel) throws InvalidRelException { + DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE); + RelTraitSet traits = newTraitSet(Prel.DRILL_PHYSICAL, collation, toDist); + RelNode newInput = convert(input, traits); + + StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput, + aggregate.getGroupSet(), + aggregate.getAggCallList(), + OperatorPhase.PHASE_1of2); + + int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints(); + + HashToMergeExchangePrel exch = + new HashToMergeExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys), + phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)), + collation, + numEndPoints); + + return new StreamAggPrel(aggregate.getCluster(), traits, exch, + aggregate.getGroupSet(), + phase1Agg.getPhase2AggCalls(), + OperatorPhase.PHASE_2of2); } - } + }.go(aggregate, convertedInput); } } } catch (InvalidRelException e) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java new file mode 100644 index 0000000..450b197 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical; + +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.RelOptRuleCall; +import org.eigenbase.relopt.RelTrait; +import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.relopt.volcano.RelSubset; + +public abstract class SubsetTransformer<T extends RelNode, E extends Exception> { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SubsetTransformer.class); + + public abstract RelNode convertChild(T current, RelNode child) throws E; + + private final RelOptRuleCall call; + + public SubsetTransformer(RelOptRuleCall call){ + this.call = call; + } + + public RelTraitSet newTraitSet(RelTrait... traits){ + RelTraitSet set = call.getPlanner().emptyTraitSet(); + for(RelTrait t : traits){ + set = set.plus(t); + } + return set; + + } + + boolean go(T n, RelNode candidateSet) throws E { + if( !(candidateSet instanceof RelSubset) ) return false; + + boolean transform = false; + + for (RelNode rel : ((RelSubset)candidateSet).getRelList()) { + if (!isDefaultDist(rel)) { + RelNode out = convertChild(n, rel); + if(out != null){ + call.transformTo(out); + transform = true; + + } + } + } + + return transform; + } + + private boolean isDefaultDist(RelNode n){ + return n.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java index 42a9984..15d94fb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java @@ -45,25 +45,29 @@ public class WriterPrule extends Prule{ final RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL); final RelNode convertedInput = convert(input, traits); - if (convertedInput instanceof RelSubset) { - RelSubset subset = (RelSubset) convertedInput; - for (RelNode rel : subset.getRelList()) { - if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) { - DrillDistributionTrait childDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE); - RelCollation childCollation = rel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE); - - DrillWriterRelBase newWriter = new WriterPrel(writer.getCluster(), - writer.getTraitSet().plus(childDist).plus(childCollation).plus(Prel.DRILL_PHYSICAL), - rel, writer.getCreateTableEntry()); - - call.transformTo(newWriter); - } - } - } else { + if (!new WriteTraitPull(call).go(writer, convertedInput)) { DrillWriterRelBase newWriter = new WriterPrel(writer.getCluster(), convertedInput.getTraitSet(), convertedInput, writer.getCreateTableEntry()); call.transformTo(newWriter); } } + + private class WriteTraitPull extends SubsetTransformer<DrillWriterRelBase, RuntimeException> { + + public WriteTraitPull(RelOptRuleCall call) { + super(call); + } + + @Override + public RelNode convertChild(DrillWriterRelBase writer, RelNode rel) throws RuntimeException { + DrillDistributionTrait childDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE); + RelCollation childCollation = rel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE); + + return new WriterPrel(writer.getCluster(), + writer.getTraitSet().plus(childDist).plus(childCollation).plus(Prel.DRILL_PHYSICAL), + rel, writer.getCreateTableEntry()); + } + + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java index 7dc5af7..1d67a3a 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java @@ -133,6 +133,12 @@ public class TestTpchExplain extends BaseTestQuery{ } @Test + public void tpch19_1() throws Exception{ + doExplain("queries/tpch/19_1.sql"); + } + + + @Test public void tpch20() throws Exception{ doExplain("queries/tpch/20.sql"); }