Enhance PhysicalOperator to use OperatorIds for references rather than random ids.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/1157abb4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/1157abb4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/1157abb4 Branch: refs/heads/master Commit: 1157abb476d10d0ecf281cd325a15eac4569b790 Parents: 2ba40e3 Author: Jacques Nadeau <[email protected]> Authored: Fri May 16 08:47:51 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Fri May 23 10:02:52 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/physical/base/AbstractBase.java | 21 ++++-- .../exec/physical/base/PhysicalOperator.java | 21 ++++-- .../exec/planner/fragment/Materializer.java | 27 +++++--- .../planner/physical/BroadcastExchangePrel.java | 2 + .../drill/exec/planner/physical/FilterPrel.java | 2 +- .../exec/planner/physical/HashAggPrel.java | 2 + .../exec/planner/physical/HashJoinPrel.java | 1 + .../physical/HashToMergeExchangePrel.java | 1 + .../physical/HashToRandomExchangePrel.java | 2 + .../drill/exec/planner/physical/LimitPrel.java | 1 + .../exec/planner/physical/MergeJoinPrel.java | 1 + .../planner/physical/PhysicalPlanCreator.java | 14 +++- .../exec/planner/physical/ProjectPrel.java | 1 + .../drill/exec/planner/physical/ScanPrel.java | 1 + .../drill/exec/planner/physical/ScreenPrel.java | 2 + .../physical/SelectionVectorRemoverPrel.java | 5 +- .../physical/SingleMergeExchangePrel.java | 2 + .../drill/exec/planner/physical/SortPrel.java | 1 + .../exec/planner/physical/StreamAggPrel.java | 9 +-- .../drill/exec/planner/physical/TopNPrel.java | 1 + .../planner/physical/UnionExchangePrel.java | 1 + .../drill/exec/planner/physical/WriterPrel.java | 5 +- .../planner/physical/explain/PrelSequencer.java | 46 ++++++++++--- .../planner/sql/handlers/DefaultSqlHandler.java | 3 +- .../apache/drill/exec/TestOpSerialization.java | 71 ++++++++++++++++++++ 25 files changed, 199 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java index a79cbc3..a028252 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java @@ -18,9 +18,9 @@ package org.apache.drill.exec.physical.base; import org.apache.drill.common.graph.GraphVisitor; -import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; public abstract class AbstractBase implements PhysicalOperator{ @@ -28,7 +28,7 @@ public abstract class AbstractBase implements PhysicalOperator{ protected long initialAllocation = 1000000L; protected long maxAllocation = 10000000000L; - + private int id; @Override public void accept(GraphVisitor<PhysicalOperator> visitor) { @@ -36,16 +36,25 @@ public abstract class AbstractBase implements PhysicalOperator{ if(this.iterator() == null) throw new IllegalArgumentException("Null iterator for pop." + this); for(PhysicalOperator o : this){ Preconditions.checkNotNull(o, String.format("Null in iterator for pop %s.", this)); - o.accept(visitor); + o.accept(visitor); } visitor.leave(this); } - + @Override public boolean isExecutable() { return true; } - + + public final void setOperatorId(int id){ + this.id = id; + } + + @Override + public int getOperatorId() { + return id; + } + @Override public SelectionVectorMode getSVMode() { return SelectionVectorMode.NONE; @@ -60,5 +69,5 @@ public abstract class AbstractBase implements PhysicalOperator{ public long getMaxAllocation() { return maxAllocation; } - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java index db57922..483c364 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java @@ -22,42 +22,44 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.graph.GraphValue; import org.apache.drill.exec.physical.OperatorCost; +import org.apache.drill.exec.planner.physical.PhysicalPlanCreator; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import com.fasterxml.jackson.annotation.JsonIdentityInfo; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyOrder; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.ObjectIdGenerators; @JsonInclude(Include.NON_NULL) @JsonPropertyOrder({ "@id" }) -@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id") +@JsonIdentityInfo(generator = ObjectIdGenerators.PropertyGenerator.class, property = "@id") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "pop") public interface PhysicalOperator extends GraphValue<PhysicalOperator> { /** * Get the cost of execution of this particular operator. - * + * * @return */ @JsonIgnore public OperatorCost getCost(); - + /** * Get the estimated size of this particular operator. * @return */ @JsonIgnore public Size getSize(); - + /** * Describes whether or not a particular physical operator can actually be executed. Most physical operators can be * executed. However, Exchange nodes cannot be executed. In order to be executed, they must be converted into their * Exec sub components. - * + * * @return */ @JsonIgnore @@ -70,10 +72,10 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> { */ @JsonIgnore public SelectionVectorMode getSVMode(); - + /** * Provides capability to build a set of output based on traversing a query graph tree. - * + * * @param physicalVisitor * @return */ @@ -97,4 +99,9 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> { */ public long getMaxAllocation(); + @JsonProperty("@id") + public int getOperatorId(); + + @JsonProperty("@id") + public void setOperatorId(int id); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java index 87078a2..ef9146a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java @@ -30,21 +30,23 @@ import com.google.common.collect.Lists; public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Materializer.IndexedFragmentNode, ExecutionSetupException>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Materializer.class); - + @Override public PhysicalOperator visitExchange(Exchange exchange, IndexedFragmentNode iNode) throws ExecutionSetupException { iNode.addAllocation(exchange); if(exchange == iNode.getNode().getSendingExchange()){ - + // this is a sending exchange. PhysicalOperator child = exchange.getChild().accept(this, iNode); PhysicalOperator materializedSender = exchange.getSender(iNode.getMinorFragmentId(), child); + materializedSender.setOperatorId(0); // logger.debug("Visit sending exchange, materialized {} with child {}.", materializedSender, child); return materializedSender; - + }else{ // receiving exchange. PhysicalOperator materializedReceiver = exchange.getReceiver(iNode.getMinorFragmentId()); + materializedReceiver.setOperatorId(Short.MAX_VALUE & exchange.getOperatorId()); // logger.debug("Visit receiving exchange, materialized receiver: {}.", materializedReceiver); return materializedReceiver; } @@ -52,7 +54,9 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate @Override public PhysicalOperator visitGroupScan(GroupScan groupScan, IndexedFragmentNode iNode) throws ExecutionSetupException { - return groupScan.getSpecificScan(iNode.getMinorFragmentId()); + PhysicalOperator child = groupScan.getSpecificScan(iNode.getMinorFragmentId()); + child.setOperatorId(Short.MAX_VALUE & groupScan.getOperatorId()); + return child; } @Override @@ -67,9 +71,10 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate PhysicalOperator child = store.getChild().accept(this, iNode); iNode.addAllocation(store); - + try { PhysicalOperator o = store.getSpecificStore(child, iNode.getMinorFragmentId()); + o.setOperatorId(Short.MAX_VALUE & store.getOperatorId()); // logger.debug("New materialized store node {} with child {}", o, child); return o; } catch (PhysicalOperatorSetupException e) { @@ -85,13 +90,15 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate for(PhysicalOperator child : op){ children.add(child.accept(this, iNode)); } - return op.getNewWithChildren(children); + PhysicalOperator newOp = op.getNewWithChildren(children); + newOp.setOperatorId(Short.MAX_VALUE & op.getOperatorId()); + return newOp; } - + public static class IndexedFragmentNode{ final Wrapper info; final int minorFragmentId; - + public IndexedFragmentNode(int minorFragmentId, Wrapper info) { super(); this.info = info; @@ -113,7 +120,7 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate public void addAllocation(PhysicalOperator pop) { info.addAllocation(pop); } - + } - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java index e0f3ee1..8b1c720 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java @@ -89,6 +89,8 @@ public class BroadcastExchangePrel extends ExchangePrel{ } BroadcastExchange g = new BroadcastExchange(childPOP); + g.setOperatorId(creator.getOperatorId(this)); + return g; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java index 8420e08..9632911 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java @@ -56,7 +56,7 @@ public class FilterPrel extends DrillFilterRelBase implements Prel { PhysicalOperator childPOP = child.getPhysicalOperator(creator); Filter p = new Filter(childPOP, getFilterExpression(new DrillParseContext()), 1.0f); - + p.setOperatorId(creator.getOperatorId(this)); return p; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java index 31feb48..b9d897e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java @@ -96,6 +96,8 @@ public class HashAggPrel extends AggPrelBase implements Prel{ aggExprs.toArray(new NamedExpression[aggExprs.size()]), 1.0f); + g.setOperatorId(creator.getOperatorId(this)); + return g; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java index 1a528d5..87da31e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java @@ -106,6 +106,7 @@ public class HashJoinPrel extends DrillJoinRelBase implements Prel { } HashJoinPOP hjoin = new HashJoinPOP(leftPop, rightPop, conditions, jtype); + hjoin.setOperatorId(creator.getOperatorId(this)); return hjoin; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java index 262fd8c..0539a33 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java @@ -90,6 +90,7 @@ public class HashToMergeExchangePrel extends ExchangePrel { HashToMergeExchange g = new HashToMergeExchange(childPOP, PrelUtil.getHashExpression(this.distFields, getChild().getRowType()), PrelUtil.getOrdering(this.collation, getChild().getRowType())); + g.setOperatorId(creator.getOperatorId(this)); return g; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java index ff3728b..b9a021e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java @@ -94,6 +94,8 @@ public class HashToRandomExchangePrel extends ExchangePrel { if(PrelUtil.getSettings(getCluster()).isSingleMode()) return childPOP; HashToRandomExchange g = new HashToRandomExchange(childPOP, PrelUtil.getHashExpression(this.fields, getChild().getRowType())); + g.setOperatorId(creator.getOperatorId(this)); + return g; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java index 5986fde..794593a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java @@ -65,6 +65,7 @@ public class LimitPrel extends DrillLimitRelBase implements Prel { Integer last = fetch != null ? Math.max(0, RexLiteral.intValue(fetch)) + first : null; Limit limit = new Limit(childPOP, first, last); + limit.setOperatorId(creator.getOperatorId(this)); return limit; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java index fe03c40..400c6a8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java @@ -110,6 +110,7 @@ public class MergeJoinPrel extends DrillJoinRelBase implements Prel { } MergeJoinPOP mjoin = new MergeJoinPOP(leftPop, rightPop, conditions, jtype); + mjoin.setOperatorId(creator.getOperatorId(this)); return mjoin; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java index 9ac07d3..f4189e4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.planner.physical; import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.drill.common.logical.PlanProperties; import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode; @@ -27,20 +28,24 @@ import org.apache.drill.common.logical.PlanProperties.PlanType; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.planner.logical.DrillParseContext; +import org.apache.drill.exec.planner.physical.explain.PrelSequencer.OpId; import com.google.common.collect.Lists; +import com.google.hive12.common.collect.Maps; public class PhysicalPlanCreator { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlanCreator.class); + private final Map<Prel, OpId> opIdMap; + private List<PhysicalOperator> popList; private final QueryContext context; PhysicalPlan plan = null; - public PhysicalPlanCreator(QueryContext context) { + public PhysicalPlanCreator(QueryContext context, Map<Prel, OpId> opIdMap) { this.context = context; + this.opIdMap = opIdMap; popList = Lists.newArrayList(); } @@ -48,6 +53,11 @@ public class PhysicalPlanCreator { return context; } + public int getOperatorId(Prel prel){ + OpId id = opIdMap.get(prel); + return id.getAsSingleInt(); + } + public PhysicalPlan build(Prel rootPrel, boolean forceRebuild) { if (plan != null && !forceRebuild) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java index 1aa34d3..70dca25 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java @@ -57,6 +57,7 @@ public class ProjectPrel extends DrillProjectRelBase implements Prel{ PhysicalOperator childPOP = child.getPhysicalOperator(creator); Project p = new Project(this.getProjectExpressions(new DrillParseContext()), childPOP); + p.setOperatorId(creator.getOperatorId(this)); return p; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java index 74cd7a9..8461e24 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java @@ -73,6 +73,7 @@ public class ScanPrel extends AbstractRelNode implements DrillScanPrel { @Override public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { + groupScan.setOperatorId(creator.getOperatorId(this)); return groupScan; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java index 36bf796..d02ed44 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java @@ -51,6 +51,8 @@ public class ScreenPrel extends DrillScreenRelBase implements Prel { PhysicalOperator childPOP = child.getPhysicalOperator(creator); Screen s = new Screen(childPOP, creator.getContext().getCurrentEndpoint()); + s.setOperatorId(creator.getOperatorId(this)); + return s; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java index 63cdcaa..fd07749 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java @@ -32,7 +32,10 @@ public class SelectionVectorRemoverPrel extends SinglePrel{ @Override public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { - return new SelectionVectorRemover( ((Prel)getChild()).getPhysicalOperator(creator)); + SelectionVectorRemover r = new SelectionVectorRemover( ((Prel)getChild()).getPhysicalOperator(creator)); + r.setOperatorId(creator.getOperatorId(this)); + return r; + } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java index 05d6e89..99d99a7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java @@ -89,6 +89,8 @@ public class SingleMergeExchangePrel extends ExchangePrel { if(PrelUtil.getSettings(getCluster()).isSingleMode()) return childPOP; SingleMergeExchange g = new SingleMergeExchange(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType())); + g.setOperatorId(creator.getOperatorId(this)); + return g; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java index d582bc6..fa5e900 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java @@ -73,6 +73,7 @@ public class SortPrel extends SortRel implements Prel { PhysicalOperator childPOP = child.getPhysicalOperator(creator); Sort g = new ExternalSort(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()), false); + g.setOperatorId(creator.getOperatorId(this)); return g; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java index 9706254..abd60c5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java @@ -42,7 +42,7 @@ public class StreamAggPrel extends AggPrelBase implements Prel{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamAggPrel.class); - + public StreamAggPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet, List<AggregateCall> aggCalls, OperatorPhase phase) throws InvalidRelException { super(cluster, traits, child, groupSet, aggCalls, phase); @@ -50,7 +50,7 @@ public class StreamAggPrel extends AggPrelBase implements Prel{ public AggregateRelBase copy(RelTraitSet traitSet, RelNode input, BitSet groupSet, List<AggregateCall> aggCalls) { try { - return new StreamAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls, + return new StreamAggPrel(getCluster(), traitSet, input, getGroupSet(), aggCalls, this.getOperatorPhase()); } catch (InvalidRelException e) { throw new AssertionError(e); @@ -79,13 +79,14 @@ public class StreamAggPrel extends AggPrelBase implements Prel{ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { Prel child = (Prel) this.getChild(); - StreamingAggregate g = new StreamingAggregate(child.getPhysicalOperator(creator), keys.toArray(new NamedExpression[keys.size()]), + StreamingAggregate g = new StreamingAggregate(child.getPhysicalOperator(creator), keys.toArray(new NamedExpression[keys.size()]), aggExprs.toArray(new NamedExpression[aggExprs.size()]), 1.0f); + g.setOperatorId(creator.getOperatorId(this)); return g; } - + @Override public SelectionVectorMode[] getSupportedEncodings() { return SelectionVectorMode.ALL; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java index 3c8cfe0..0067926 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java @@ -61,6 +61,7 @@ public class TopNPrel extends SinglePrel { PhysicalOperator childPOP = child.getPhysicalOperator(creator); TopN topN = new TopN(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()), false, this.limit); + topN.setOperatorId(creator.getOperatorId(this)); return topN; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java index 5d6b85d..f14df72 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java @@ -81,6 +81,7 @@ public class UnionExchangePrel extends ExchangePrel { if(PrelUtil.getSettings(getCluster()).isSingleMode()) return childPOP; UnionExchange g = new UnionExchange(childPOP); + g.setOperatorId(creator.getOperatorId(this)); return g; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java index 4cefeb5..e948125 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java @@ -43,7 +43,10 @@ public class WriterPrel extends DrillWriterRelBase implements Prel { @Override public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { Prel child = (Prel) this.getChild(); - return getCreateTableEntry().getWriter(child.getPhysicalOperator(creator)); + PhysicalOperator g = getCreateTableEntry().getWriter(child.getPhysicalOperator(creator)); + g.setOperatorId(creator.getOperatorId(this)); + + return g; } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java index 169deca..2ab6c74 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java @@ -42,14 +42,17 @@ public class PrelSequencer implements PrelVisitor<Void, PrelSequencer.Frag, Runt if (rel == null) { return null; } - PrelSequencer s = new PrelSequencer(); final StringWriter sw = new StringWriter(); - final RelWriter planWriter = new NumberingRelWriter(s.go(rel), new PrintWriter(sw), explainlevel); + final RelWriter planWriter = new NumberingRelWriter(getIdMap(rel), new PrintWriter(sw), explainlevel); rel.explain(planWriter); return sw.toString(); } + public static Map<Prel, OpId> getIdMap(Prel rel){ + PrelSequencer s = new PrelSequencer(); + return s.go(rel); + } static class Frag implements Iterable<Frag>{ @@ -110,7 +113,7 @@ public class PrelSequencer implements PrelVisitor<Void, PrelSequencer.Frag, Runt } - static class OpId{ + public static class OpId{ int fragmentId; int opId; public OpId(int fragmentId, int opId) { @@ -118,6 +121,21 @@ public class PrelSequencer implements PrelVisitor<Void, PrelSequencer.Frag, Runt this.fragmentId = fragmentId; this.opId = opId; } + + + public int getFragmentId() { + return fragmentId; + } + + + public int getOpId() { + return opId; + } + + public int getAsSingleInt(){ + return (fragmentId << 16) + opId; + } + @Override public int hashCode() { final int prime = 31; @@ -172,19 +190,27 @@ public class PrelSequencer implements PrelVisitor<Void, PrelSequencer.Frag, Runt } // for each fragment, do a dfs of operators to assign operator ids. - Map<Prel, OpId> ids = Maps.newHashMap(); + Map<Prel, OpId> ids = Maps.newIdentityHashMap(); + + ids.put(rootFrag.root, new OpId(0, 0)); for(Frag f : frags){ - int id = 0; + int id = 1; Queue<Prel> ops = Lists.newLinkedList(); ops.add(f.root); while(!ops.isEmpty()){ Prel p = ops.remove(); - if(p instanceof ExchangePrel && p != f.root) continue; - ids.put(p, new OpId(f.majorFragmentId, id++) ); + boolean isExchange = p instanceof ExchangePrel; + + if(p != f.root){ // we account for exchanges as receviers to guarantee unique identifiers. + ids.put(p, new OpId(f.majorFragmentId, id++) ); + } + - List<Prel> children = Lists.reverse(Lists.newArrayList(p.iterator())); - for(Prel child : children){ - ops.add(child); + if(!isExchange || p == f.root){ + List<Prel> children = Lists.reverse(Lists.newArrayList(p.iterator())); + for(Prel child : children){ + ops.add(child); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index 1cb3cfb..b06432a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -132,8 +132,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler { } protected PhysicalOperator convertToPop(Prel prel) throws IOException { - - PhysicalPlanCreator creator = new PhysicalPlanCreator(context); + PhysicalPlanCreator creator = new PhysicalPlanCreator(context, PrelSequencer.getIdMap(prel)); PhysicalOperator op = prel.getPhysicalOperator(creator); return op; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1157abb4/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java new file mode 100644 index 0000000..ad1d6b6 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java @@ -0,0 +1,71 @@ +package org.apache.drill.exec; + +import static org.junit.Assert.assertEquals; + +import java.util.List; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.expression.ExpressionPosition; +import org.apache.drill.common.expression.ValueExpressions; +import org.apache.drill.common.logical.PlanProperties; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.Filter; +import org.apache.drill.exec.physical.config.Screen; +import org.apache.drill.exec.physical.config.UnionExchange; +import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.store.mock.MockSubScanPOP; +import org.junit.Test; + +import com.google.hive12.common.collect.Lists; + +public class TestOpSerialization { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestOpSerialization.class); + + @Test + public void testSerializedDeserialize() throws Throwable { + DrillConfig c = DrillConfig.create(); + PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance()); + MockSubScanPOP s = new MockSubScanPOP("abc", null); + s.setOperatorId(3); + Filter f = new Filter(s, new ValueExpressions.BooleanExpression("true", ExpressionPosition.UNKNOWN), 0.1f); + f.setOperatorId(2); + UnionExchange e = new UnionExchange(f); + e.setOperatorId(1); + Screen screen = new Screen(e, CoordinationProtos.DrillbitEndpoint.getDefaultInstance()); + screen.setOperatorId(0); + + boolean reversed = false; + while(true){ + + List<PhysicalOperator> pops = Lists.newArrayList(); + pops.add(s); + pops.add(e); + pops.add(f); + pops.add(screen); + + if(reversed) pops = Lists.reverse(pops); + PhysicalPlan plan1 = new PhysicalPlan(PlanProperties.builder().build(), pops); + String json = plan1.unparse(c.getMapper().writer()); + System.out.println(json); + + PhysicalPlan plan2 = reader.readPhysicalPlan(json); + System.out.println("++++++++"); + System.out.println(plan2.unparse(c.getMapper().writer())); + + PhysicalOperator root = plan2.getSortedOperators(false).iterator().next(); + assertEquals(0, root.getOperatorId()); + PhysicalOperator o1 = root.iterator().next(); + assertEquals(1, o1.getOperatorId()); + PhysicalOperator o2 = o1.iterator().next(); + assertEquals(2, o2.getOperatorId()); + if(reversed) break; + reversed = !reversed; + } + + + + + } +}
