Refactor trait pull up to common SubsetTransformer. Update Prules to use new 
class and update FilterPrule to use all instead of best to work with Optiq 0.9.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/913fad85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/913fad85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/913fad85

Branch: refs/heads/master
Commit: 913fad858bbb751cde47b15a2cffda7f4797bcad
Parents: 0cbf6ad
Author: Jacques Nadeau <jacq...@apache.org>
Authored: Sat Jul 26 21:36:53 2014 -0700
Committer: Jacques Nadeau <jacq...@apache.org>
Committed: Sat Jul 26 21:41:10 2014 -0700

----------------------------------------------------------------------
 .../exec/planner/physical/FilterPrule.java      |  31 +++---
 .../exec/planner/physical/HashAggPrule.java     |  64 ++++++-----
 .../exec/planner/physical/JoinPruleBase.java    |  82 +++++++-------
 .../exec/planner/physical/ProjectPrule.java     |  39 +++----
 .../exec/planner/physical/StreamAggPrule.java   | 106 +++++++++----------
 .../planner/physical/SubsetTransformer.java     |  69 ++++++++++++
 .../exec/planner/physical/WriterPrule.java      |  34 +++---
 .../java/org/apache/drill/TestTpchExplain.java  |   6 ++
 8 files changed, 257 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java
index e72a780..c15c5e0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java
@@ -19,13 +19,10 @@ package org.apache.drill.exec.planner.physical;
 
 import org.apache.drill.exec.planner.logical.DrillFilterRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
-import org.eigenbase.rel.RelCollation;
-import org.eigenbase.rel.RelCollationTraitDef;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptRule;
 import org.eigenbase.relopt.RelOptRuleCall;
 import org.eigenbase.relopt.RelTraitSet;
-import org.eigenbase.relopt.volcano.RelSubset;
 
 public class FilterPrule extends Prule {
   public static final RelOptRule INSTANCE = new FilterPrule();
@@ -41,19 +38,25 @@ public class FilterPrule extends Prule {
 
     RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL);
     RelNode convertedInput = convert(input, traits);
-    boolean transform = false;
-    
-    if (convertedInput instanceof RelSubset) {
-      RelSubset subset = (RelSubset) convertedInput;
-      RelNode bestRel = null;
-      if ((bestRel = subset.getBest()) != null) {
-        call.transformTo(new FilterPrel(filter.getCluster(), 
bestRel.getTraitSet(), convertedInput, filter.getCondition()));  
-        transform = true;
-      } 
-    }
+
+    boolean transform = new Subset(call).go(filter, convertedInput);
+
     if (!transform) {
       call.transformTo(new FilterPrel(filter.getCluster(), 
convertedInput.getTraitSet(), convertedInput, filter.getCondition()));
     }
   }
-  
+
+
+  private class Subset extends SubsetTransformer<DrillFilterRel, 
RuntimeException> {
+
+    public Subset(RelOptRuleCall call) {
+      super(call);
+    }
+
+    @Override
+    public RelNode convertChild(DrillFilterRel filter, RelNode rel) {
+      return new FilterPrel(filter.getCluster(), rel.getTraitSet(), rel, 
filter.getCondition());
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
index d8b2338..4d42f66 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
@@ -26,8 +26,8 @@ import org.eigenbase.rel.InvalidRelException;
 import org.eigenbase.rel.RelNode;
 import org.eigenbase.relopt.RelOptRule;
 import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTrait;
 import org.eigenbase.relopt.RelTraitSet;
-import org.eigenbase.relopt.volcano.RelSubset;
 import org.eigenbase.trace.EigenbaseTrace;
 
 import com.google.common.collect.ImmutableList;
@@ -84,34 +84,8 @@ public class HashAggPrule extends AggPruleBase {
           traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) 
;
 
           RelNode convertedInput = convert(input, traits);
+          new TwoPhaseSubset(call, distOnAllKeys).go(aggregate, 
convertedInput);
 
-          if (convertedInput instanceof RelSubset) {
-            RelSubset subset = (RelSubset) convertedInput;
-            for (RelNode rel : subset.getRelList()) {
-              if 
(!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT))
 {
-                DrillDistributionTrait toDist = 
rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
-                traits = 
call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
-                RelNode newInput = convert(input, traits);
-
-                HashAggPrel phase1Agg = new 
HashAggPrel(aggregate.getCluster(), traits, newInput,
-                    aggregate.getGroupSet(),
-                    aggregate.getAggCallList(), 
-                    OperatorPhase.PHASE_1of2);
-
-                HashToRandomExchangePrel exch =
-                    new HashToRandomExchangePrel(phase1Agg.getCluster(), 
phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
-                        phase1Agg, 
ImmutableList.copyOf(getDistributionField(aggregate, true)));
-
-                HashAggPrel phase2Agg =  new 
HashAggPrel(aggregate.getCluster(), traits, exch,
-                                                         
aggregate.getGroupSet(),
-                                                         
phase1Agg.getPhase2AggCalls(), 
-                                                         
OperatorPhase.PHASE_2of2); 
-                                                    
-
-                call.transformTo(phase2Agg);
-              }
-            }
-          }
         }
       }
     } catch (InvalidRelException e) {
@@ -119,6 +93,40 @@ public class HashAggPrule extends AggPruleBase {
     }
   }
 
+
+  private class TwoPhaseSubset extends SubsetTransformer<DrillAggregateRel, 
InvalidRelException> {
+    final RelTrait distOnAllKeys;
+
+    public TwoPhaseSubset(RelOptRuleCall call, RelTrait distOnAllKeys) {
+      super(call);
+      this.distOnAllKeys = distOnAllKeys;
+    }
+
+    @Override
+    public RelNode convertChild(DrillAggregateRel aggregate, RelNode input) 
throws InvalidRelException {
+
+      RelTraitSet traits = newTraitSet(Prel.DRILL_PHYSICAL, 
input.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE));
+      RelNode newInput = convert(input, traits);
+
+      HashAggPrel phase1Agg = new HashAggPrel(aggregate.getCluster(), traits, 
newInput,
+          aggregate.getGroupSet(),
+          aggregate.getAggCallList(),
+          OperatorPhase.PHASE_1of2);
+
+      HashToRandomExchangePrel exch =
+          new HashToRandomExchangePrel(phase1Agg.getCluster(), 
phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
+              phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, 
true)));
+
+      HashAggPrel phase2Agg =  new HashAggPrel(aggregate.getCluster(), traits, 
exch,
+                                               aggregate.getGroupSet(),
+                                               phase1Agg.getPhase2AggCalls(),
+                                               OperatorPhase.PHASE_2of2);
+
+      return phase2Agg;
+    }
+
+  }
+
   private void createTransformRequest(RelOptRuleCall call, DrillAggregateRel 
aggregate,
                                       RelNode input, RelTraitSet traits) 
throws InvalidRelException {
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
index 336e34c..d6bd711 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
@@ -92,39 +92,39 @@ public abstract class JoinPruleBase extends Prule {
       PhysicalJoinType physicalJoinType,
       RelNode left, RelNode right,
       RelCollation collationLeft, RelCollation collationRight, boolean 
hashSingleKey)throws InvalidRelException {
-    
-    /* If join keys are  l1 = r1 and l2 = r2 and ... l_k = r_k, then consider 
the following options of plan:  
+
+    /* If join keys are  l1 = r1 and l2 = r2 and ... l_k = r_k, then consider 
the following options of plan:
      *   1) Plan1: distributed by (l1, l2, ..., l_k) for left side and by (r1, 
r2, ..., r_k) for right side.
      *   2) Plan2: distributed by l1 for left side, by r1 for right side.
      *   3) Plan3: distributed by l2 for left side, by r2 for right side.
      *   ...
      *      Plan_(k+1): distributed by l_k for left side, by r_k by right side.
-     *   
-     *   Whether enumerate plan 2, .., Plan_(k+1) depends on option : 
hashSingleKey. 
+     *
+     *   Whether enumerate plan 2, .., Plan_(k+1) depends on option : 
hashSingleKey.
      */
-         
+
     DrillDistributionTrait hashLeftPartition = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
 ImmutableList.copyOf(getDistributionField(join.getLeftKeys())));
     DrillDistributionTrait hashRightPartition = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
 ImmutableList.copyOf(getDistributionField(join.getRightKeys())));
- 
+
     createDistBothPlan(call, join, physicalJoinType, left, right, 
collationLeft, collationRight, hashLeftPartition, hashRightPartition);
-    
+
     assert (join.getLeftKeys().size() == join.getRightKeys().size());
-    
+
     if (!hashSingleKey)
       return;
-    
+
     int numJoinKeys = join.getLeftKeys().size();
     if (numJoinKeys > 1) {
       for (int i = 0; i< numJoinKeys; i++) {
         hashLeftPartition = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
 ImmutableList.copyOf(getDistributionField(join.getLeftKeys().subList(i, 
i+1))));
         hashRightPartition = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
 ImmutableList.copyOf(getDistributionField(join.getRightKeys().subList(i, 
i+1))));
-        
+
         createDistBothPlan(call, join, physicalJoinType, left, right, 
collationLeft, collationRight, hashLeftPartition, hashRightPartition);
       }
     }
   }
 
-      
+
   // Create join plan with both left and right children hash distributed. If 
the physical join type
   // is MergeJoin, a collation must be provided for both left and right child 
and the plan will contain
   // sort converter if necessary to provide the collation.
@@ -170,9 +170,9 @@ public abstract class JoinPruleBase extends Prule {
   // is MergeJoin, a collation must be provided for both left and right child 
and the plan will contain sort converter
   // if necessary to provide the collation.
   protected void createBroadcastPlan(RelOptRuleCall call, DrillJoinRel join,
-      PhysicalJoinType physicalJoinType,
-      RelNode left, RelNode right,
-      RelCollation collationLeft, RelCollation collationRight) throws 
InvalidRelException {
+      final PhysicalJoinType physicalJoinType,
+      final RelNode left, final RelNode right,
+      final RelCollation collationLeft, final RelCollation collationRight) 
throws InvalidRelException {
 
     DrillDistributionTrait distBroadcastRight = new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED);
     RelTraitSet traitsRight = null;
@@ -183,37 +183,35 @@ public abstract class JoinPruleBase extends Prule {
       traitsRight = 
right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distBroadcastRight);
     }
 
-    RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
-    RelNode convertedLeft = convert(left, traitsLeft);
-    RelNode convertedRight = convert(right, traitsRight);
+    final RelTraitSet traitsLeft = 
left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
+    final RelNode convertedLeft = convert(left, traitsLeft);
+    final RelNode convertedRight = convert(right, traitsRight);
 
-    traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
+    new SubsetTransformer<DrillJoinRel, InvalidRelException>(call){
 
-    DrillJoinRelBase newJoin = null;
-
-    if (convertedLeft instanceof RelSubset) {
-      RelSubset subset = (RelSubset) convertedLeft;
-      for (RelNode rel : subset.getRelList()) {
-        if 
(!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT))
 {
-          DrillDistributionTrait toDist = 
rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
-          if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
-            traitsLeft = 
call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationLeft).plus(toDist);
-          } else {
-            traitsLeft = 
call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
-          }
-
-          RelNode newLeft = convert(left, traitsLeft);
-          if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
-            newJoin = new HashJoinPrel(join.getCluster(), traitsLeft, newLeft, 
convertedRight, join.getCondition(),
-                                       join.getJoinType());
-          } else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
-            newJoin = new MergeJoinPrel(join.getCluster(), traitsLeft, 
newLeft, convertedRight, join.getCondition(),
-                                        join.getJoinType());
-          }
-          call.transformTo(newJoin) ;
+      public RelNode convertChild(final DrillJoinRel join, final RelNode rel) 
throws InvalidRelException {
+        DrillDistributionTrait toDist = 
rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+        RelTraitSet newTraitsLeft;
+        if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
+          newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, collationLeft, 
toDist);
+        } else {
+          newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
+        }
+        Character.digit(1, 1);
+        RelNode newLeft = convert(left, newTraitsLeft);
+        if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
+          return new HashJoinPrel(join.getCluster(), traitsLeft, newLeft, 
convertedRight, join.getCondition(),
+                                     join.getJoinType());
+        } else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
+          return new MergeJoinPrel(join.getCluster(), traitsLeft, newLeft, 
convertedRight, join.getCondition(),
+                                      join.getJoinType());
+        } else{
+          return null;
         }
+
       }
-    }
-  }
 
+    }.go(join, convertedLeft);
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
index 02e6d44..833aaae 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
@@ -23,19 +23,15 @@ import java.util.Map;
 
 import net.hydromatic.linq4j.Ord;
 
-import org.apache.drill.exec.planner.common.DrillProjectRelBase;
 import org.apache.drill.exec.planner.logical.DrillProjectRel;
-import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import 
org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
 import 
org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionType;
-import org.eigenbase.rel.ProjectRel;
 import org.eigenbase.rel.RelCollation;
 import org.eigenbase.rel.RelCollationImpl;
 import org.eigenbase.rel.RelCollationTraitDef;
 import org.eigenbase.rel.RelFieldCollation;
 import org.eigenbase.rel.RelNode;
-import org.eigenbase.relopt.Convention;
 import org.eigenbase.relopt.RelOptRule;
 import org.eigenbase.relopt.RelOptRuleCall;
 import org.eigenbase.relopt.RelTraitSet;
@@ -64,26 +60,33 @@ public class ProjectPrule extends Prule {
     RelNode convertedInput = convert(input, traits);
 
     Map<Integer, Integer> inToOut = getProjectMap(project);
+    boolean traitPull = new ProjectTraitPull(call, inToOut).go(project, 
convertedInput);
 
-    if (convertedInput instanceof RelSubset) {
-      RelSubset subset = (RelSubset) convertedInput;
-      for (RelNode rel : subset.getRelList()) {
-        if 
(!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT))
 {
-          DrillDistributionTrait childDist = 
rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
-          RelCollation childCollation = 
rel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
+    if(!traitPull){
+      call.transformTo(new ProjectPrel(project.getCluster(), 
convertedInput.getTraitSet(), convertedInput, project.getProjects(), 
project.getRowType()));
+    }
+  }
 
+  private class ProjectTraitPull extends SubsetTransformer<DrillProjectRel, 
RuntimeException> {
+    final Map<Integer, Integer> inToOut;
 
-          DrillDistributionTrait newDist = convertDist(childDist, inToOut);
-          RelCollation newCollation = convertRelCollation(childCollation, 
inToOut);
+    public ProjectTraitPull(RelOptRuleCall call, Map<Integer, Integer> 
inToOut) {
+      super(call);
+      this.inToOut = inToOut;
+    }
 
-          call.transformTo(new ProjectPrel(project.getCluster(), 
project.getTraitSet().plus(newDist).plus(newCollation).plus(Prel.DRILL_PHYSICAL),
-              rel, project.getProjects(), project.getRowType()));
-        }
-      }
+    @Override
+    public RelNode convertChild(DrillProjectRel project, RelNode rel) throws 
RuntimeException {
+      DrillDistributionTrait childDist = 
rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+      RelCollation childCollation = 
rel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
 
-    } else{
-      call.transformTo(new ProjectPrel(project.getCluster(), 
convertedInput.getTraitSet(), convertedInput, project.getProjects(), 
project.getRowType()));
+
+      DrillDistributionTrait newDist = convertDist(childDist, inToOut);
+      RelCollation newCollation = convertRelCollation(childCollation, inToOut);
+      RelTraitSet newProjectTraits = 
rel.getTraitSet().plus(newDist).plus(newCollation);
+      return new ProjectPrel(project.getCluster(), newProjectTraits, rel, 
project.getProjects(), project.getRowType());
     }
+
   }
 
   private DrillDistributionTrait convertDist(DrillDistributionTrait srcDist, 
Map<Integer, Integer> inToOut) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
index 0375161..4191184 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
@@ -23,6 +23,7 @@ import java.util.logging.Logger;
 import net.hydromatic.optiq.util.BitSets;
 
 import org.apache.drill.exec.planner.logical.DrillAggregateRel;
+import org.apache.drill.exec.planner.logical.DrillJoinRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
 import org.eigenbase.rel.InvalidRelException;
@@ -56,7 +57,7 @@ public class StreamAggPrule extends AggPruleBase {
   public void onMatch(RelOptRuleCall call) {
     final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
     final RelNode input = aggregate.getChild();
-    RelCollation collation = getCollation(aggregate);
+    final RelCollation collation = getCollation(aggregate);
     RelTraitSet traits = null;
 
     if (aggregate.containsDistinctCall()) {
@@ -67,44 +68,40 @@ public class StreamAggPrule extends AggPruleBase {
     try {
       if (aggregate.getGroupSet().isEmpty()) {
         DrillDistributionTrait singleDist = DrillDistributionTrait.SINGLETON;
-        RelTraitSet singleDistTrait = 
call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(singleDist);
+        final RelTraitSet singleDistTrait = 
call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(singleDist);
 
         if (create2PhasePlan(call, aggregate)) {
           traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) 
;
 
           RelNode convertedInput = convert(input, traits);
+          new SubsetTransformer<DrillAggregateRel, InvalidRelException>(call){
 
-          if (convertedInput instanceof RelSubset) {
-            RelSubset subset = (RelSubset) convertedInput;
-            for (RelNode rel : subset.getRelList()) {
-              if 
(!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT))
 {
-                DrillDistributionTrait toDist = 
rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
-                traits = 
call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
-                RelNode newInput = convert(input, traits);
-
-                StreamAggPrel phase1Agg = new 
StreamAggPrel(aggregate.getCluster(), traits, newInput,
-                    aggregate.getGroupSet(),
-                    aggregate.getAggCallList(),
-                    OperatorPhase.PHASE_1of2);
-
-                UnionExchangePrel exch =
-                    new UnionExchangePrel(phase1Agg.getCluster(), 
singleDistTrait, phase1Agg);
-
-                StreamAggPrel phase2Agg =  new 
StreamAggPrel(aggregate.getCluster(), singleDistTrait, exch,
-                    aggregate.getGroupSet(),
-                    phase1Agg.getPhase2AggCalls(),
-                    OperatorPhase.PHASE_2of2);
-
-                call.transformTo(phase2Agg);
-              }
+            public RelNode convertChild(final DrillAggregateRel join, final 
RelNode rel) throws InvalidRelException {
+              DrillDistributionTrait toDist = 
rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+              RelTraitSet traits = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
+              RelNode newInput = convert(input, traits);
+
+              StreamAggPrel phase1Agg = new 
StreamAggPrel(aggregate.getCluster(), traits, newInput,
+                  aggregate.getGroupSet(),
+                  aggregate.getAggCallList(),
+                  OperatorPhase.PHASE_1of2);
+
+              UnionExchangePrel exch =
+                  new UnionExchangePrel(phase1Agg.getCluster(), 
singleDistTrait, phase1Agg);
+
+              return  new StreamAggPrel(aggregate.getCluster(), 
singleDistTrait, exch,
+                  aggregate.getGroupSet(),
+                  phase1Agg.getPhase2AggCalls(),
+                  OperatorPhase.PHASE_2of2);
             }
-          }
+          }.go(aggregate, convertedInput);
+
         } else {
           createTransformRequest(call, aggregate, input, singleDistTrait);
         }
       } else {
         // hash distribute on all grouping keys
-        DrillDistributionTrait distOnAllKeys =
+        final DrillDistributionTrait distOnAllKeys =
             new 
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
                                        
ImmutableList.copyOf(getDistributionField(aggregate, true)));
 
@@ -126,39 +123,34 @@ public class StreamAggPrule extends AggPruleBase {
 
         if (create2PhasePlan(call, aggregate)) {
           traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) 
;
-
           RelNode convertedInput = convert(input, traits);
 
-          if (convertedInput instanceof RelSubset) {
-            RelSubset subset = (RelSubset) convertedInput;
-            for (RelNode rel : subset.getRelList()) {
-              if 
(!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT))
 {
-                DrillDistributionTrait toDist = 
rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
-                traits = 
call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(toDist);
-                RelNode newInput = convert(input, traits);
-
-                StreamAggPrel phase1Agg = new 
StreamAggPrel(aggregate.getCluster(), traits, newInput,
-                    aggregate.getGroupSet(),
-                    aggregate.getAggCallList(),
-                    OperatorPhase.PHASE_1of2);
-
-                int numEndPoints = 
PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints();
-
-                HashToMergeExchangePrel exch =
-                    new HashToMergeExchangePrel(phase1Agg.getCluster(), 
phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
-                        phase1Agg, 
ImmutableList.copyOf(getDistributionField(aggregate, true)),
-                        collation,
-                        numEndPoints);
-
-                StreamAggPrel phase2Agg =  new 
StreamAggPrel(aggregate.getCluster(), traits, exch,
-                    aggregate.getGroupSet(),
-                    phase1Agg.getPhase2AggCalls(),
-                    OperatorPhase.PHASE_2of2);
-
-                call.transformTo(phase2Agg);
-              }
+          new SubsetTransformer<DrillAggregateRel, InvalidRelException>(call){
+
+            public RelNode convertChild(final DrillAggregateRel aggregate, 
final RelNode rel) throws InvalidRelException {
+              DrillDistributionTrait toDist = 
rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+              RelTraitSet traits = newTraitSet(Prel.DRILL_PHYSICAL, collation, 
toDist);
+              RelNode newInput = convert(input, traits);
+
+              StreamAggPrel phase1Agg = new 
StreamAggPrel(aggregate.getCluster(), traits, newInput,
+                  aggregate.getGroupSet(),
+                  aggregate.getAggCallList(),
+                  OperatorPhase.PHASE_1of2);
+
+              int numEndPoints = 
PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints();
+
+              HashToMergeExchangePrel exch =
+                  new HashToMergeExchangePrel(phase1Agg.getCluster(), 
phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
+                      phase1Agg, 
ImmutableList.copyOf(getDistributionField(aggregate, true)),
+                      collation,
+                      numEndPoints);
+
+              return new StreamAggPrel(aggregate.getCluster(), traits, exch,
+                  aggregate.getGroupSet(),
+                  phase1Agg.getPhase2AggCalls(),
+                  OperatorPhase.PHASE_2of2);
             }
-          }
+          }.go(aggregate, convertedInput);
         }
       }
     } catch (InvalidRelException e) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java
new file mode 100644
index 0000000..450b197
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTrait;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.relopt.volcano.RelSubset;
+
+public abstract class SubsetTransformer<T extends RelNode, E extends 
Exception> {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(SubsetTransformer.class);
+
+  public abstract RelNode convertChild(T current, RelNode child) throws E;
+
+  private final RelOptRuleCall call;
+
+  public SubsetTransformer(RelOptRuleCall call){
+    this.call = call;
+  }
+
+  public RelTraitSet newTraitSet(RelTrait... traits){
+    RelTraitSet set = call.getPlanner().emptyTraitSet();
+    for(RelTrait t : traits){
+      set = set.plus(t);
+    }
+    return set;
+
+  }
+
+  boolean go(T n, RelNode candidateSet) throws E {
+    if( !(candidateSet instanceof RelSubset) ) return false;
+
+    boolean transform = false;
+
+    for (RelNode rel : ((RelSubset)candidateSet).getRelList()) {
+      if (!isDefaultDist(rel)) {
+        RelNode out = convertChild(n, rel);
+        if(out != null){
+          call.transformTo(out);
+          transform = true;
+
+        }
+      }
+    }
+
+    return transform;
+  }
+
+  private boolean isDefaultDist(RelNode n){
+    return 
n.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
index 42a9984..15d94fb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
@@ -45,25 +45,29 @@ public class WriterPrule extends Prule{
     final RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL);
     final RelNode convertedInput = convert(input, traits);
 
-    if (convertedInput instanceof RelSubset) {
-      RelSubset subset = (RelSubset) convertedInput;
-      for (RelNode rel : subset.getRelList()) {
-        if 
(!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT))
 {
-          DrillDistributionTrait childDist = 
rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
-          RelCollation childCollation = 
rel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
-
-          DrillWriterRelBase newWriter = new WriterPrel(writer.getCluster(),
-              
writer.getTraitSet().plus(childDist).plus(childCollation).plus(Prel.DRILL_PHYSICAL),
-              rel, writer.getCreateTableEntry());
-
-          call.transformTo(newWriter);
-        }
-      }
-    } else {
+    if (!new WriteTraitPull(call).go(writer, convertedInput)) {
       DrillWriterRelBase newWriter = new WriterPrel(writer.getCluster(), 
convertedInput.getTraitSet(),
           convertedInput, writer.getCreateTableEntry());
 
       call.transformTo(newWriter);
     }
   }
+
+  private class WriteTraitPull extends SubsetTransformer<DrillWriterRelBase, 
RuntimeException> {
+
+    public WriteTraitPull(RelOptRuleCall call) {
+      super(call);
+    }
+
+    @Override
+    public RelNode convertChild(DrillWriterRelBase writer, RelNode rel) throws 
RuntimeException {
+      DrillDistributionTrait childDist = 
rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+      RelCollation childCollation = 
rel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
+
+      return new WriterPrel(writer.getCluster(),
+          
writer.getTraitSet().plus(childDist).plus(childCollation).plus(Prel.DRILL_PHYSICAL),
+          rel, writer.getCreateTableEntry());
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/913fad85/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java 
b/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java
index 7dc5af7..1d67a3a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchExplain.java
@@ -133,6 +133,12 @@ public class TestTpchExplain extends BaseTestQuery{
   }
 
   @Test
+  public void tpch19_1() throws Exception{
+    doExplain("queries/tpch/19_1.sql");
+  }
+
+
+  @Test
   public void tpch20() throws Exception{
     doExplain("queries/tpch/20.sql");
   }

Reply via email to