Enhance PhysicalOperator to use OperatorIds for references rather than random 
ids.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/1157abb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/1157abb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/1157abb4

Branch: refs/heads/master
Commit: 1157abb476d10d0ecf281cd325a15eac4569b790
Parents: 2ba40e3
Author: Jacques Nadeau <[email protected]>
Authored: Fri May 16 08:47:51 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Fri May 23 10:02:52 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/physical/base/AbstractBase.java  | 21 ++++--
 .../exec/physical/base/PhysicalOperator.java    | 21 ++++--
 .../exec/planner/fragment/Materializer.java     | 27 +++++---
 .../planner/physical/BroadcastExchangePrel.java |  2 +
 .../drill/exec/planner/physical/FilterPrel.java |  2 +-
 .../exec/planner/physical/HashAggPrel.java      |  2 +
 .../exec/planner/physical/HashJoinPrel.java     |  1 +
 .../physical/HashToMergeExchangePrel.java       |  1 +
 .../physical/HashToRandomExchangePrel.java      |  2 +
 .../drill/exec/planner/physical/LimitPrel.java  |  1 +
 .../exec/planner/physical/MergeJoinPrel.java    |  1 +
 .../planner/physical/PhysicalPlanCreator.java   | 14 +++-
 .../exec/planner/physical/ProjectPrel.java      |  1 +
 .../drill/exec/planner/physical/ScanPrel.java   |  1 +
 .../drill/exec/planner/physical/ScreenPrel.java |  2 +
 .../physical/SelectionVectorRemoverPrel.java    |  5 +-
 .../physical/SingleMergeExchangePrel.java       |  2 +
 .../drill/exec/planner/physical/SortPrel.java   |  1 +
 .../exec/planner/physical/StreamAggPrel.java    |  9 +--
 .../drill/exec/planner/physical/TopNPrel.java   |  1 +
 .../planner/physical/UnionExchangePrel.java     |  1 +
 .../drill/exec/planner/physical/WriterPrel.java |  5 +-
 .../planner/physical/explain/PrelSequencer.java | 46 ++++++++++---
 .../planner/sql/handlers/DefaultSqlHandler.java |  3 +-
 .../apache/drill/exec/TestOpSerialization.java  | 71 ++++++++++++++++++++
 25 files changed, 199 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
index a79cbc3..a028252 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -18,9 +18,9 @@
 package org.apache.drill.exec.physical.base;
 
 import org.apache.drill.common.graph.GraphVisitor;
-import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 
 public abstract class AbstractBase implements PhysicalOperator{
@@ -28,7 +28,7 @@ public abstract class AbstractBase implements 
PhysicalOperator{
 
   protected long initialAllocation = 1000000L;
   protected long maxAllocation = 10000000000L;
-
+  private int id;
 
   @Override
   public void accept(GraphVisitor<PhysicalOperator> visitor) {
@@ -36,16 +36,25 @@ public abstract class AbstractBase implements 
PhysicalOperator{
     if(this.iterator() == null) throw new IllegalArgumentException("Null 
iterator for pop." + this);
     for(PhysicalOperator o : this){
       Preconditions.checkNotNull(o, String.format("Null in iterator for pop 
%s.", this));
-      o.accept(visitor);  
+      o.accept(visitor);
     }
     visitor.leave(this);
   }
-  
+
   @Override
   public boolean isExecutable() {
     return true;
   }
-  
+
+  public final void setOperatorId(int id){
+    this.id = id;
+  }
+
+  @Override
+  public int getOperatorId() {
+    return id;
+  }
+
   @Override
   public SelectionVectorMode getSVMode() {
     return SelectionVectorMode.NONE;
@@ -60,5 +69,5 @@ public abstract class AbstractBase implements 
PhysicalOperator{
   public long getMaxAllocation() {
     return maxAllocation;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index db57922..483c364 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -22,42 +22,44 @@ import java.util.List;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.graph.GraphValue;
 import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 
 import com.fasterxml.jackson.annotation.JsonIdentityInfo;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.annotation.ObjectIdGenerators;
 
 @JsonInclude(Include.NON_NULL)
 @JsonPropertyOrder({ "@id" })
-@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, 
property = "@id")
+@JsonIdentityInfo(generator = ObjectIdGenerators.PropertyGenerator.class, 
property = "@id")
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, 
property = "pop")
 public interface  PhysicalOperator extends GraphValue<PhysicalOperator> {
 
   /**
    * Get the cost of execution of this particular operator.
-   * 
+   *
    * @return
    */
   @JsonIgnore
   public OperatorCost getCost();
-  
+
   /**
    * Get the estimated size of this particular operator.
    * @return
    */
   @JsonIgnore
   public Size getSize();
-  
+
   /**
    * Describes whether or not a particular physical operator can actually be 
executed. Most physical operators can be
    * executed. However, Exchange nodes cannot be executed. In order to be 
executed, they must be converted into their
    * Exec sub components.
-   * 
+   *
    * @return
    */
   @JsonIgnore
@@ -70,10 +72,10 @@ public interface  PhysicalOperator extends 
GraphValue<PhysicalOperator> {
    */
   @JsonIgnore
   public SelectionVectorMode getSVMode();
-  
+
   /**
    * Provides capability to build a set of output based on traversing a query 
graph tree.
-   * 
+   *
    * @param physicalVisitor
    * @return
    */
@@ -97,4 +99,9 @@ public interface  PhysicalOperator extends 
GraphValue<PhysicalOperator> {
    */
   public long getMaxAllocation();
 
+  @JsonProperty("@id")
+  public int getOperatorId();
+
+  @JsonProperty("@id")
+  public void setOperatorId(int id);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
index 87078a2..ef9146a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
@@ -30,21 +30,23 @@ import com.google.common.collect.Lists;
 public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, 
Materializer.IndexedFragmentNode, ExecutionSetupException>{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(Materializer.class);
 
-  
+
   @Override
   public PhysicalOperator visitExchange(Exchange exchange, IndexedFragmentNode 
iNode) throws ExecutionSetupException {
     iNode.addAllocation(exchange);
     if(exchange == iNode.getNode().getSendingExchange()){
-      
+
       // this is a sending exchange.
       PhysicalOperator child = exchange.getChild().accept(this, iNode);
       PhysicalOperator materializedSender = 
exchange.getSender(iNode.getMinorFragmentId(), child);
+      materializedSender.setOperatorId(0);
 //      logger.debug("Visit sending exchange, materialized {} with child {}.", 
materializedSender, child);
       return materializedSender;
-      
+
     }else{
       // receiving exchange.
       PhysicalOperator materializedReceiver = 
exchange.getReceiver(iNode.getMinorFragmentId());
+      materializedReceiver.setOperatorId(Short.MAX_VALUE & 
exchange.getOperatorId());
 //      logger.debug("Visit receiving exchange, materialized receiver: {}.", 
materializedReceiver);
       return materializedReceiver;
     }
@@ -52,7 +54,9 @@ public class Materializer extends 
AbstractPhysicalVisitor<PhysicalOperator, Mate
 
   @Override
   public PhysicalOperator visitGroupScan(GroupScan groupScan, 
IndexedFragmentNode iNode) throws ExecutionSetupException {
-    return groupScan.getSpecificScan(iNode.getMinorFragmentId());
+    PhysicalOperator child = 
groupScan.getSpecificScan(iNode.getMinorFragmentId());
+    child.setOperatorId(Short.MAX_VALUE & groupScan.getOperatorId());
+    return child;
   }
 
   @Override
@@ -67,9 +71,10 @@ public class Materializer extends 
AbstractPhysicalVisitor<PhysicalOperator, Mate
     PhysicalOperator child = store.getChild().accept(this, iNode);
 
     iNode.addAllocation(store);
-    
+
     try {
       PhysicalOperator o = store.getSpecificStore(child, 
iNode.getMinorFragmentId());
+      o.setOperatorId(Short.MAX_VALUE & store.getOperatorId());
 //      logger.debug("New materialized store node {} with child {}", o, child);
       return o;
     } catch (PhysicalOperatorSetupException e) {
@@ -85,13 +90,15 @@ public class Materializer extends 
AbstractPhysicalVisitor<PhysicalOperator, Mate
     for(PhysicalOperator child : op){
       children.add(child.accept(this, iNode));
     }
-    return op.getNewWithChildren(children);
+    PhysicalOperator newOp = op.getNewWithChildren(children);
+    newOp.setOperatorId(Short.MAX_VALUE & op.getOperatorId());
+    return newOp;
   }
-  
+
   public static class IndexedFragmentNode{
     final Wrapper info;
     final int minorFragmentId;
-    
+
     public IndexedFragmentNode(int minorFragmentId, Wrapper info) {
       super();
       this.info = info;
@@ -113,7 +120,7 @@ public class Materializer extends 
AbstractPhysicalVisitor<PhysicalOperator, Mate
     public void addAllocation(PhysicalOperator pop) {
       info.addAllocation(pop);
     }
-    
+
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
index e0f3ee1..8b1c720 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
@@ -89,6 +89,8 @@ public class BroadcastExchangePrel extends ExchangePrel{
     }
 
     BroadcastExchange g = new BroadcastExchange(childPOP);
+    g.setOperatorId(creator.getOperatorId(this));
+
     return g;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
index 8420e08..9632911 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
@@ -56,7 +56,7 @@ public class FilterPrel extends DrillFilterRelBase implements 
Prel {
     PhysicalOperator childPOP = child.getPhysicalOperator(creator);
 
     Filter p = new Filter(childPOP, getFilterExpression(new 
DrillParseContext()), 1.0f);
-
+    p.setOperatorId(creator.getOperatorId(this));
     return p;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
index 31feb48..b9d897e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
@@ -96,6 +96,8 @@ public class HashAggPrel extends AggPrelBase implements Prel{
         aggExprs.toArray(new NamedExpression[aggExprs.size()]),
         1.0f);
 
+    g.setOperatorId(creator.getOperatorId(this));
+
     return g;
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
index 1a528d5..87da31e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
@@ -106,6 +106,7 @@ public class HashJoinPrel  extends DrillJoinRelBase 
implements Prel {
     }
 
     HashJoinPOP hjoin = new HashJoinPOP(leftPop, rightPop, conditions, jtype);
+    hjoin.setOperatorId(creator.getOperatorId(this));
 
     return hjoin;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
index 262fd8c..0539a33 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java
@@ -90,6 +90,7 @@ public class HashToMergeExchangePrel extends ExchangePrel {
     HashToMergeExchange g = new HashToMergeExchange(childPOP,
         PrelUtil.getHashExpression(this.distFields, getChild().getRowType()),
         PrelUtil.getOrdering(this.collation, getChild().getRowType()));
+    g.setOperatorId(creator.getOperatorId(this));
     return g;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
index ff3728b..b9a021e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
@@ -94,6 +94,8 @@ public class HashToRandomExchangePrel extends ExchangePrel {
     if(PrelUtil.getSettings(getCluster()).isSingleMode()) return childPOP;
 
     HashToRandomExchange g = new HashToRandomExchange(childPOP, 
PrelUtil.getHashExpression(this.fields, getChild().getRowType()));
+    g.setOperatorId(creator.getOperatorId(this));
+
     return g;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
index 5986fde..794593a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
@@ -65,6 +65,7 @@ public class LimitPrel extends DrillLimitRelBase implements 
Prel {
     Integer last = fetch != null ? Math.max(0, RexLiteral.intValue(fetch)) + 
first : null;
 
     Limit limit = new Limit(childPOP, first, last);
+    limit.setOperatorId(creator.getOperatorId(this));
 
     return limit;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
index fe03c40..400c6a8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java
@@ -110,6 +110,7 @@ public class MergeJoinPrel  extends DrillJoinRelBase 
implements Prel {
     }
 
     MergeJoinPOP mjoin = new MergeJoinPOP(leftPop, rightPop, conditions, 
jtype);
+    mjoin.setOperatorId(creator.getOperatorId(this));
 
     return mjoin;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java
index 9ac07d3..f4189e4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.planner.physical;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.drill.common.logical.PlanProperties;
 import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
@@ -27,20 +28,24 @@ import 
org.apache.drill.common.logical.PlanProperties.PlanType;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.physical.explain.PrelSequencer.OpId;
 
 import com.google.common.collect.Lists;
+import com.google.hive12.common.collect.Maps;
 
 
 public class PhysicalPlanCreator {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PhysicalPlanCreator.class);
 
+  private final Map<Prel, OpId> opIdMap;
+
   private List<PhysicalOperator> popList;
   private final QueryContext context;
   PhysicalPlan plan = null;
 
-  public PhysicalPlanCreator(QueryContext context) {
+  public PhysicalPlanCreator(QueryContext context, Map<Prel, OpId> opIdMap) {
     this.context = context;
+    this.opIdMap = opIdMap;
     popList = Lists.newArrayList();
   }
 
@@ -48,6 +53,11 @@ public class PhysicalPlanCreator {
     return context;
   }
 
+  public int getOperatorId(Prel prel){
+    OpId id = opIdMap.get(prel);
+    return id.getAsSingleInt();
+  }
+
   public PhysicalPlan build(Prel rootPrel, boolean forceRebuild) {
 
     if (plan != null && !forceRebuild) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
index 1aa34d3..70dca25 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
@@ -57,6 +57,7 @@ public class ProjectPrel extends DrillProjectRelBase 
implements Prel{
     PhysicalOperator childPOP = child.getPhysicalOperator(creator);
 
     Project p = new Project(this.getProjectExpressions(new 
DrillParseContext()),  childPOP);
+    p.setOperatorId(creator.getOperatorId(this));
 
     return p;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
index 74cd7a9..8461e24 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
@@ -73,6 +73,7 @@ public class ScanPrel extends AbstractRelNode implements 
DrillScanPrel {
   @Override
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator)
       throws IOException {
+    groupScan.setOperatorId(creator.getOperatorId(this));
     return groupScan;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
index 36bf796..d02ed44 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
@@ -51,6 +51,8 @@ public class ScreenPrel extends DrillScreenRelBase implements 
Prel {
     PhysicalOperator childPOP = child.getPhysicalOperator(creator);
 
     Screen s = new Screen(childPOP, creator.getContext().getCurrentEndpoint());
+    s.setOperatorId(creator.getOperatorId(this));
+
     return s;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
index 63cdcaa..fd07749 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
@@ -32,7 +32,10 @@ public class SelectionVectorRemoverPrel extends SinglePrel{
 
   @Override
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) 
throws IOException {
-    return new SelectionVectorRemover( 
((Prel)getChild()).getPhysicalOperator(creator));
+    SelectionVectorRemover r =  new SelectionVectorRemover( 
((Prel)getChild()).getPhysicalOperator(creator));
+    r.setOperatorId(creator.getOperatorId(this));
+    return r;
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
index 05d6e89..99d99a7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
@@ -89,6 +89,8 @@ public class SingleMergeExchangePrel extends ExchangePrel {
     if(PrelUtil.getSettings(getCluster()).isSingleMode()) return childPOP;
 
     SingleMergeExchange g = new SingleMergeExchange(childPOP, 
PrelUtil.getOrdering(this.collation, getChild().getRowType()));
+    g.setOperatorId(creator.getOperatorId(this));
+
     return g;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
index d582bc6..fa5e900 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
@@ -73,6 +73,7 @@ public class SortPrel extends SortRel implements Prel {
     PhysicalOperator childPOP = child.getPhysicalOperator(creator);
 
     Sort g = new ExternalSort(childPOP, PrelUtil.getOrdering(this.collation, 
getChild().getRowType()), false);
+    g.setOperatorId(creator.getOperatorId(this));
 
     return g;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
index 9706254..abd60c5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java
@@ -42,7 +42,7 @@ public class StreamAggPrel extends AggPrelBase implements 
Prel{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(StreamAggPrel.class);
 
 
-  
+
   public StreamAggPrel(RelOptCluster cluster, RelTraitSet traits, RelNode 
child, BitSet groupSet,
       List<AggregateCall> aggCalls, OperatorPhase phase) throws 
InvalidRelException {
     super(cluster, traits, child, groupSet, aggCalls, phase);
@@ -50,7 +50,7 @@ public class StreamAggPrel extends AggPrelBase implements 
Prel{
 
   public AggregateRelBase copy(RelTraitSet traitSet, RelNode input, BitSet 
groupSet, List<AggregateCall> aggCalls) {
     try {
-      return new StreamAggPrel(getCluster(), traitSet, input, getGroupSet(), 
aggCalls, 
+      return new StreamAggPrel(getCluster(), traitSet, input, getGroupSet(), 
aggCalls,
           this.getOperatorPhase());
     } catch (InvalidRelException e) {
       throw new AssertionError(e);
@@ -79,13 +79,14 @@ public class StreamAggPrel extends AggPrelBase implements 
Prel{
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) 
throws IOException {
 
     Prel child = (Prel) this.getChild();
-    StreamingAggregate g = new 
StreamingAggregate(child.getPhysicalOperator(creator), keys.toArray(new 
NamedExpression[keys.size()]), 
+    StreamingAggregate g = new 
StreamingAggregate(child.getPhysicalOperator(creator), keys.toArray(new 
NamedExpression[keys.size()]),
         aggExprs.toArray(new NamedExpression[aggExprs.size()]), 1.0f);
 
+    g.setOperatorId(creator.getOperatorId(this));
     return g;
 
   }
-  
+
   @Override
   public SelectionVectorMode[] getSupportedEncodings() {
     return SelectionVectorMode.ALL;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
index 3c8cfe0..0067926 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
@@ -61,6 +61,7 @@ public class TopNPrel extends SinglePrel {
     PhysicalOperator childPOP = child.getPhysicalOperator(creator);
 
     TopN topN = new TopN(childPOP, PrelUtil.getOrdering(this.collation, 
getChild().getRowType()), false, this.limit);
+    topN.setOperatorId(creator.getOperatorId(this));
 
     return topN;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java
index 5d6b85d..f14df72 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java
@@ -81,6 +81,7 @@ public class UnionExchangePrel extends ExchangePrel {
     if(PrelUtil.getSettings(getCluster()).isSingleMode()) return childPOP;
 
     UnionExchange g = new UnionExchange(childPOP);
+    g.setOperatorId(creator.getOperatorId(this));
     return g;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
index 4cefeb5..e948125 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
@@ -43,7 +43,10 @@ public class WriterPrel extends DrillWriterRelBase 
implements Prel {
   @Override
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) 
throws IOException {
     Prel child = (Prel) this.getChild();
-    return getCreateTableEntry().getWriter(child.getPhysicalOperator(creator));
+    PhysicalOperator g = 
getCreateTableEntry().getWriter(child.getPhysicalOperator(creator));
+    g.setOperatorId(creator.getOperatorId(this));
+
+    return g;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java
index 169deca..2ab6c74 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java
@@ -42,14 +42,17 @@ public class PrelSequencer implements PrelVisitor<Void, 
PrelSequencer.Frag, Runt
       if (rel == null) {
         return null;
       }
-      PrelSequencer s = new PrelSequencer();
       final StringWriter sw = new StringWriter();
-      final RelWriter planWriter = new NumberingRelWriter(s.go(rel), new 
PrintWriter(sw), explainlevel);
+      final RelWriter planWriter = new NumberingRelWriter(getIdMap(rel), new 
PrintWriter(sw), explainlevel);
       rel.explain(planWriter);
       return sw.toString();
 
   }
 
+  public static Map<Prel, OpId> getIdMap(Prel rel){
+    PrelSequencer s = new PrelSequencer();
+    return s.go(rel);
+  }
 
 
   static class Frag implements Iterable<Frag>{
@@ -110,7 +113,7 @@ public class PrelSequencer implements PrelVisitor<Void, 
PrelSequencer.Frag, Runt
 
   }
 
-  static class OpId{
+  public static class OpId{
     int fragmentId;
     int opId;
     public OpId(int fragmentId, int opId) {
@@ -118,6 +121,21 @@ public class PrelSequencer implements PrelVisitor<Void, 
PrelSequencer.Frag, Runt
       this.fragmentId = fragmentId;
       this.opId = opId;
     }
+
+
+    public int getFragmentId() {
+      return fragmentId;
+    }
+
+
+    public int getOpId() {
+      return opId;
+    }
+
+    public int getAsSingleInt(){
+      return (fragmentId << 16) + opId;
+    }
+
     @Override
     public int hashCode() {
       final int prime = 31;
@@ -172,19 +190,27 @@ public class PrelSequencer implements PrelVisitor<Void, 
PrelSequencer.Frag, Runt
     }
 
     // for each fragment, do a dfs of operators to assign operator ids.
-    Map<Prel, OpId> ids = Maps.newHashMap();
+    Map<Prel, OpId> ids = Maps.newIdentityHashMap();
+
+    ids.put(rootFrag.root, new OpId(0, 0));
     for(Frag f : frags){
-      int id = 0;
+      int id = 1;
       Queue<Prel> ops = Lists.newLinkedList();
       ops.add(f.root);
       while(!ops.isEmpty()){
         Prel p = ops.remove();
-        if(p instanceof ExchangePrel && p != f.root) continue;
-        ids.put(p, new OpId(f.majorFragmentId, id++) );
+        boolean isExchange = p instanceof ExchangePrel;
+
+        if(p != f.root){      // we account for exchanges as receviers to 
guarantee unique identifiers.
+          ids.put(p, new OpId(f.majorFragmentId, id++) );
+        }
+
 
-        List<Prel> children = Lists.reverse(Lists.newArrayList(p.iterator()));
-        for(Prel child : children){
-          ops.add(child);
+        if(!isExchange || p == f.root){
+          List<Prel> children = 
Lists.reverse(Lists.newArrayList(p.iterator()));
+          for(Prel child : children){
+            ops.add(child);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 1cb3cfb..b06432a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -132,8 +132,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
   }
 
   protected PhysicalOperator convertToPop(Prel prel) throws IOException {
-
-    PhysicalPlanCreator creator = new PhysicalPlanCreator(context);
+    PhysicalPlanCreator creator = new PhysicalPlanCreator(context, 
PrelSequencer.getIdMap(prel));
     PhysicalOperator op = prel.getPhysicalOperator(creator);
     return op;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java 
b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
new file mode 100644
index 0000000..ad1d6b6
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
@@ -0,0 +1,71 @@
+package org.apache.drill.exec;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.logical.PlanProperties;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.UnionExchange;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.mock.MockSubScanPOP;
+import org.junit.Test;
+
+import com.google.hive12.common.collect.Lists;
+
+public class TestOpSerialization {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TestOpSerialization.class);
+
+  @Test
+  public void testSerializedDeserialize() throws Throwable {
+    DrillConfig c = DrillConfig.create();
+    PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), 
CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+    MockSubScanPOP s = new MockSubScanPOP("abc", null);
+    s.setOperatorId(3);
+    Filter f = new Filter(s, new ValueExpressions.BooleanExpression("true", 
ExpressionPosition.UNKNOWN), 0.1f);
+    f.setOperatorId(2);
+    UnionExchange e = new UnionExchange(f);
+    e.setOperatorId(1);
+    Screen screen = new Screen(e, 
CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+    screen.setOperatorId(0);
+
+    boolean reversed = false;
+    while(true){
+
+      List<PhysicalOperator> pops = Lists.newArrayList();
+      pops.add(s);
+      pops.add(e);
+      pops.add(f);
+      pops.add(screen);
+
+      if(reversed) pops = Lists.reverse(pops);
+      PhysicalPlan plan1 = new PhysicalPlan(PlanProperties.builder().build(), 
pops);
+      String json = plan1.unparse(c.getMapper().writer());
+      System.out.println(json);
+
+      PhysicalPlan plan2 = reader.readPhysicalPlan(json);
+      System.out.println("++++++++");
+      System.out.println(plan2.unparse(c.getMapper().writer()));
+
+      PhysicalOperator root = 
plan2.getSortedOperators(false).iterator().next();
+      assertEquals(0, root.getOperatorId());
+      PhysicalOperator o1 = root.iterator().next();
+      assertEquals(1, o1.getOperatorId());
+      PhysicalOperator o2 = o1.iterator().next();
+      assertEquals(2, o2.getOperatorId());
+      if(reversed) break;
+      reversed = !reversed;
+    }
+
+
+
+
+  }
+}

Reply via email to