wip to support op numbering throughout exec.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/ebf3d340 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/ebf3d340 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/ebf3d340 Branch: refs/heads/diagnostics2 Commit: ebf3d340497afeceb93d7e7c8211c5eebfce9ebf Parents: e14a38c Author: Jacques Nadeau <[email protected]> Authored: Fri May 16 10:52:15 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Mon May 19 09:11:22 2014 -0700 ---------------------------------------------------------------------- .../exec/planner/fragment/Materializer.java | 27 +++++++----- .../planner/physical/BroadcastExchangePrel.java | 2 + .../drill/exec/planner/physical/FilterPrel.java | 2 +- .../exec/planner/physical/HashAggPrel.java | 2 + .../exec/planner/physical/HashJoinPrel.java | 1 + .../physical/HashToMergeExchangePrel.java | 1 + .../physical/HashToRandomExchangePrel.java | 2 + .../drill/exec/planner/physical/LimitPrel.java | 1 + .../exec/planner/physical/MergeJoinPrel.java | 1 + .../planner/physical/PhysicalPlanCreator.java | 14 +++++- .../exec/planner/physical/ProjectPrel.java | 1 + .../drill/exec/planner/physical/ScanPrel.java | 1 + .../drill/exec/planner/physical/ScreenPrel.java | 2 + .../physical/SelectionVectorRemoverPrel.java | 5 ++- .../physical/SingleMergeExchangePrel.java | 2 + .../drill/exec/planner/physical/SortPrel.java | 1 + .../exec/planner/physical/StreamAggPrel.java | 2 +- .../drill/exec/planner/physical/TopNPrel.java | 1 + .../planner/physical/UnionExchangePrel.java | 1 + .../drill/exec/planner/physical/WriterPrel.java | 5 ++- .../planner/physical/explain/PrelSequencer.java | 46 +++++++++++++++----- .../planner/sql/handlers/DefaultSqlHandler.java | 3 +- .../apache/drill/exec/TestOpSerialization.java | 10 +++-- 23 files changed, 102 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java index 87078a2..ef9146a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java @@ -30,21 +30,23 @@ import com.google.common.collect.Lists; public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Materializer.IndexedFragmentNode, ExecutionSetupException>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Materializer.class); - + @Override public PhysicalOperator visitExchange(Exchange exchange, IndexedFragmentNode iNode) throws ExecutionSetupException { iNode.addAllocation(exchange); if(exchange == iNode.getNode().getSendingExchange()){ - + // this is a sending exchange. PhysicalOperator child = exchange.getChild().accept(this, iNode); PhysicalOperator materializedSender = exchange.getSender(iNode.getMinorFragmentId(), child); + materializedSender.setOperatorId(0); // logger.debug("Visit sending exchange, materialized {} with child {}.", materializedSender, child); return materializedSender; - + }else{ // receiving exchange. PhysicalOperator materializedReceiver = exchange.getReceiver(iNode.getMinorFragmentId()); + materializedReceiver.setOperatorId(Short.MAX_VALUE & exchange.getOperatorId()); // logger.debug("Visit receiving exchange, materialized receiver: {}.", materializedReceiver); return materializedReceiver; } @@ -52,7 +54,9 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate @Override public PhysicalOperator visitGroupScan(GroupScan groupScan, IndexedFragmentNode iNode) throws ExecutionSetupException { - return groupScan.getSpecificScan(iNode.getMinorFragmentId()); + PhysicalOperator child = groupScan.getSpecificScan(iNode.getMinorFragmentId()); + child.setOperatorId(Short.MAX_VALUE & groupScan.getOperatorId()); + return child; } @Override @@ -67,9 +71,10 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate PhysicalOperator child = store.getChild().accept(this, iNode); iNode.addAllocation(store); - + try { PhysicalOperator o = store.getSpecificStore(child, iNode.getMinorFragmentId()); + o.setOperatorId(Short.MAX_VALUE & store.getOperatorId()); // logger.debug("New materialized store node {} with child {}", o, child); return o; } catch (PhysicalOperatorSetupException e) { @@ -85,13 +90,15 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate for(PhysicalOperator child : op){ children.add(child.accept(this, iNode)); } - return op.getNewWithChildren(children); + PhysicalOperator newOp = op.getNewWithChildren(children); + newOp.setOperatorId(Short.MAX_VALUE & op.getOperatorId()); + return newOp; } - + public static class IndexedFragmentNode{ final Wrapper info; final int minorFragmentId; - + public IndexedFragmentNode(int minorFragmentId, Wrapper info) { super(); this.info = info; @@ -113,7 +120,7 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate public void addAllocation(PhysicalOperator pop) { info.addAllocation(pop); } - + } - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java index e0f3ee1..8b1c720 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java @@ -89,6 +89,8 @@ public class BroadcastExchangePrel extends ExchangePrel{ } BroadcastExchange g = new BroadcastExchange(childPOP); + g.setOperatorId(creator.getOperatorId(this)); + return g; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java index 8420e08..9632911 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java @@ -56,7 +56,7 @@ public class FilterPrel extends DrillFilterRelBase implements Prel { PhysicalOperator childPOP = child.getPhysicalOperator(creator); Filter p = new Filter(childPOP, getFilterExpression(new DrillParseContext()), 1.0f); - + p.setOperatorId(creator.getOperatorId(this)); return p; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java index b2378be..6377e35 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java @@ -110,6 +110,8 @@ public class HashAggPrel extends AggregateRelBase implements Prel{ exprs.toArray(new NamedExpression[exprs.size()]), 1.0f); + g.setOperatorId(creator.getOperatorId(this)); + return g; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java index 1a528d5..87da31e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java @@ -106,6 +106,7 @@ public class HashJoinPrel extends DrillJoinRelBase implements Prel { } HashJoinPOP hjoin = new HashJoinPOP(leftPop, rightPop, conditions, jtype); + hjoin.setOperatorId(creator.getOperatorId(this)); return hjoin; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java index 262fd8c..0539a33 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToMergeExchangePrel.java @@ -90,6 +90,7 @@ public class HashToMergeExchangePrel extends ExchangePrel { HashToMergeExchange g = new HashToMergeExchange(childPOP, PrelUtil.getHashExpression(this.distFields, getChild().getRowType()), PrelUtil.getOrdering(this.collation, getChild().getRowType())); + g.setOperatorId(creator.getOperatorId(this)); return g; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java index ec9ed79..a5699cd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java @@ -94,6 +94,8 @@ public class HashToRandomExchangePrel extends ExchangePrel { if(PrelUtil.getSettings(getCluster()).isSingleMode()) return childPOP; HashToRandomExchange g = new HashToRandomExchange(childPOP, PrelUtil.getHashExpression(this.fields, getChild().getRowType())); + g.setOperatorId(creator.getOperatorId(this)); + return g; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java index 5986fde..794593a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java @@ -65,6 +65,7 @@ public class LimitPrel extends DrillLimitRelBase implements Prel { Integer last = fetch != null ? Math.max(0, RexLiteral.intValue(fetch)) + first : null; Limit limit = new Limit(childPOP, first, last); + limit.setOperatorId(creator.getOperatorId(this)); return limit; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java index fe03c40..400c6a8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java @@ -110,6 +110,7 @@ public class MergeJoinPrel extends DrillJoinRelBase implements Prel { } MergeJoinPOP mjoin = new MergeJoinPOP(leftPop, rightPop, conditions, jtype); + mjoin.setOperatorId(creator.getOperatorId(this)); return mjoin; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java index 9ac07d3..f4189e4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.planner.physical; import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.drill.common.logical.PlanProperties; import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode; @@ -27,20 +28,24 @@ import org.apache.drill.common.logical.PlanProperties.PlanType; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.planner.logical.DrillParseContext; +import org.apache.drill.exec.planner.physical.explain.PrelSequencer.OpId; import com.google.common.collect.Lists; +import com.google.hive12.common.collect.Maps; public class PhysicalPlanCreator { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlanCreator.class); + private final Map<Prel, OpId> opIdMap; + private List<PhysicalOperator> popList; private final QueryContext context; PhysicalPlan plan = null; - public PhysicalPlanCreator(QueryContext context) { + public PhysicalPlanCreator(QueryContext context, Map<Prel, OpId> opIdMap) { this.context = context; + this.opIdMap = opIdMap; popList = Lists.newArrayList(); } @@ -48,6 +53,11 @@ public class PhysicalPlanCreator { return context; } + public int getOperatorId(Prel prel){ + OpId id = opIdMap.get(prel); + return id.getAsSingleInt(); + } + public PhysicalPlan build(Prel rootPrel, boolean forceRebuild) { if (plan != null && !forceRebuild) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java index 1aa34d3..70dca25 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java @@ -57,6 +57,7 @@ public class ProjectPrel extends DrillProjectRelBase implements Prel{ PhysicalOperator childPOP = child.getPhysicalOperator(creator); Project p = new Project(this.getProjectExpressions(new DrillParseContext()), childPOP); + p.setOperatorId(creator.getOperatorId(this)); return p; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java index 74cd7a9..8461e24 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java @@ -73,6 +73,7 @@ public class ScanPrel extends AbstractRelNode implements DrillScanPrel { @Override public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { + groupScan.setOperatorId(creator.getOperatorId(this)); return groupScan; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java index 36bf796..d02ed44 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java @@ -51,6 +51,8 @@ public class ScreenPrel extends DrillScreenRelBase implements Prel { PhysicalOperator childPOP = child.getPhysicalOperator(creator); Screen s = new Screen(childPOP, creator.getContext().getCurrentEndpoint()); + s.setOperatorId(creator.getOperatorId(this)); + return s; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java index 63cdcaa..fd07749 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java @@ -32,7 +32,10 @@ public class SelectionVectorRemoverPrel extends SinglePrel{ @Override public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { - return new SelectionVectorRemover( ((Prel)getChild()).getPhysicalOperator(creator)); + SelectionVectorRemover r = new SelectionVectorRemover( ((Prel)getChild()).getPhysicalOperator(creator)); + r.setOperatorId(creator.getOperatorId(this)); + return r; + } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java index 05d6e89..99d99a7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java @@ -89,6 +89,8 @@ public class SingleMergeExchangePrel extends ExchangePrel { if(PrelUtil.getSettings(getCluster()).isSingleMode()) return childPOP; SingleMergeExchange g = new SingleMergeExchange(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType())); + g.setOperatorId(creator.getOperatorId(this)); + return g; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java index d582bc6..fa5e900 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java @@ -73,6 +73,7 @@ public class SortPrel extends SortRel implements Prel { PhysicalOperator childPOP = child.getPhysicalOperator(creator); Sort g = new ExternalSort(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()), false); + g.setOperatorId(creator.getOperatorId(this)); return g; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java index 5fb758a..a95d926 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrel.java @@ -109,7 +109,7 @@ public class StreamAggPrel extends AggregateRelBase implements Prel{ Prel child = (Prel) this.getChild(); StreamingAggregate g = new StreamingAggregate(child.getPhysicalOperator(creator), keys.toArray(new NamedExpression[keys.size()]), exprs.toArray(new NamedExpression[exprs.size()]), 1.0f); - + g.setOperatorId(creator.getOperatorId(this)); return g; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java index 3c8cfe0..0067926 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java @@ -61,6 +61,7 @@ public class TopNPrel extends SinglePrel { PhysicalOperator childPOP = child.getPhysicalOperator(creator); TopN topN = new TopN(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()), false, this.limit); + topN.setOperatorId(creator.getOperatorId(this)); return topN; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java index 5d6b85d..f14df72 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnionExchangePrel.java @@ -81,6 +81,7 @@ public class UnionExchangePrel extends ExchangePrel { if(PrelUtil.getSettings(getCluster()).isSingleMode()) return childPOP; UnionExchange g = new UnionExchange(childPOP); + g.setOperatorId(creator.getOperatorId(this)); return g; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java index 4cefeb5..e948125 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java @@ -43,7 +43,10 @@ public class WriterPrel extends DrillWriterRelBase implements Prel { @Override public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { Prel child = (Prel) this.getChild(); - return getCreateTableEntry().getWriter(child.getPhysicalOperator(creator)); + PhysicalOperator g = getCreateTableEntry().getWriter(child.getPhysicalOperator(creator)); + g.setOperatorId(creator.getOperatorId(this)); + + return g; } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java index 169deca..2ab6c74 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/PrelSequencer.java @@ -42,14 +42,17 @@ public class PrelSequencer implements PrelVisitor<Void, PrelSequencer.Frag, Runt if (rel == null) { return null; } - PrelSequencer s = new PrelSequencer(); final StringWriter sw = new StringWriter(); - final RelWriter planWriter = new NumberingRelWriter(s.go(rel), new PrintWriter(sw), explainlevel); + final RelWriter planWriter = new NumberingRelWriter(getIdMap(rel), new PrintWriter(sw), explainlevel); rel.explain(planWriter); return sw.toString(); } + public static Map<Prel, OpId> getIdMap(Prel rel){ + PrelSequencer s = new PrelSequencer(); + return s.go(rel); + } static class Frag implements Iterable<Frag>{ @@ -110,7 +113,7 @@ public class PrelSequencer implements PrelVisitor<Void, PrelSequencer.Frag, Runt } - static class OpId{ + public static class OpId{ int fragmentId; int opId; public OpId(int fragmentId, int opId) { @@ -118,6 +121,21 @@ public class PrelSequencer implements PrelVisitor<Void, PrelSequencer.Frag, Runt this.fragmentId = fragmentId; this.opId = opId; } + + + public int getFragmentId() { + return fragmentId; + } + + + public int getOpId() { + return opId; + } + + public int getAsSingleInt(){ + return (fragmentId << 16) + opId; + } + @Override public int hashCode() { final int prime = 31; @@ -172,19 +190,27 @@ public class PrelSequencer implements PrelVisitor<Void, PrelSequencer.Frag, Runt } // for each fragment, do a dfs of operators to assign operator ids. - Map<Prel, OpId> ids = Maps.newHashMap(); + Map<Prel, OpId> ids = Maps.newIdentityHashMap(); + + ids.put(rootFrag.root, new OpId(0, 0)); for(Frag f : frags){ - int id = 0; + int id = 1; Queue<Prel> ops = Lists.newLinkedList(); ops.add(f.root); while(!ops.isEmpty()){ Prel p = ops.remove(); - if(p instanceof ExchangePrel && p != f.root) continue; - ids.put(p, new OpId(f.majorFragmentId, id++) ); + boolean isExchange = p instanceof ExchangePrel; + + if(p != f.root){ // we account for exchanges as receviers to guarantee unique identifiers. + ids.put(p, new OpId(f.majorFragmentId, id++) ); + } + - List<Prel> children = Lists.reverse(Lists.newArrayList(p.iterator())); - for(Prel child : children){ - ops.add(child); + if(!isExchange || p == f.root){ + List<Prel> children = Lists.reverse(Lists.newArrayList(p.iterator())); + for(Prel child : children){ + ops.add(child); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index 1cb3cfb..b06432a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -132,8 +132,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler { } protected PhysicalOperator convertToPop(Prel prel) throws IOException { - - PhysicalPlanCreator creator = new PhysicalPlanCreator(context); + PhysicalPlanCreator creator = new PhysicalPlanCreator(context, PrelSequencer.getIdMap(prel)); PhysicalOperator op = prel.getPhysicalOperator(creator); return op; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ebf3d340/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java index 3040de2..ad1d6b6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java @@ -12,6 +12,7 @@ import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.Filter; import org.apache.drill.exec.physical.config.Screen; +import org.apache.drill.exec.physical.config.UnionExchange; import org.apache.drill.exec.planner.PhysicalPlanReader; import org.apache.drill.exec.proto.CoordinationProtos; import org.apache.drill.exec.store.mock.MockSubScanPOP; @@ -27,10 +28,12 @@ public class TestOpSerialization { DrillConfig c = DrillConfig.create(); PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance()); MockSubScanPOP s = new MockSubScanPOP("abc", null); - s.setOperatorId(2); + s.setOperatorId(3); Filter f = new Filter(s, new ValueExpressions.BooleanExpression("true", ExpressionPosition.UNKNOWN), 0.1f); - f.setOperatorId(1); - Screen screen = new Screen(f, CoordinationProtos.DrillbitEndpoint.getDefaultInstance()); + f.setOperatorId(2); + UnionExchange e = new UnionExchange(f); + e.setOperatorId(1); + Screen screen = new Screen(e, CoordinationProtos.DrillbitEndpoint.getDefaultInstance()); screen.setOperatorId(0); boolean reversed = false; @@ -38,6 +41,7 @@ public class TestOpSerialization { List<PhysicalOperator> pops = Lists.newArrayList(); pops.add(s); + pops.add(e); pops.add(f); pops.add(screen);
