Repository: tajo Updated Branches: refs/heads/master fdb76ed2c -> 7b0af7448
TAJO-2082: Aggregation on a derived table which includes union can cause incorrect result. Closes #969 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/7b0af744 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/7b0af744 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/7b0af744 Branch: refs/heads/master Commit: 7b0af74483521615f302d2a3376556dad325297f Parents: fdb76ed Author: Jihoon Son <[email protected]> Authored: Fri Mar 4 16:12:16 2016 +0900 Committer: Jihoon Son <[email protected]> Committed: Fri Mar 4 16:12:16 2016 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../TooLargeInputForCrossJoinException.java | 2 +- .../apache/tajo/util/graph/DirectedGraph.java | 3 +- .../tajo/util/graph/DirectedGraphVisitor.java | 4 +- .../tajo/util/graph/SimpleDirectedGraph.java | 13 +- .../util/graph/TestSimpleDirectedGraph.java | 3 +- .../tajo/engine/planner/global/DataChannel.java | 8 + .../engine/planner/global/ExecutionBlock.java | 161 +++++++++----- .../engine/planner/global/GlobalPlanner.java | 21 +- .../tajo/engine/planner/global/MasterPlan.java | 51 ++++- .../global/builder/DistinctGroupbyBuilder.java | 3 +- .../rewriter/rules/BroadcastJoinRule.java | 9 +- .../planner/physical/ExternalSortExec.java | 2 +- .../apache/tajo/querymaster/Repartitioner.java | 29 ++- .../java/org/apache/tajo/querymaster/Stage.java | 222 +++++++++++-------- .../plan/rewrite/SelfDescSchemaBuildPhase.java | 2 +- .../org/apache/tajo/plan/util/PlannerUtil.java | 2 +- 17 files changed, 343 insertions(+), 195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 7892aa2..592bfe3 100644 --- a/CHANGES +++ b/CHANGES @@ -106,6 +106,9 @@ Release 0.12.0 - unreleased BUG FIXES + TAJO-2082: Aggregation on a derived table which includes union can cause + incorrect result. (jihoon) + TAJO-2081: Incorrect task locality on single node. (jinho) TAJO-2080: ArrayIndexOutOfBoundsException when performing aggregation on an http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java b/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java index 55d5f46..d958cc6 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java @@ -33,6 +33,6 @@ public class TooLargeInputForCrossJoinException extends TajoException { } public TooLargeInputForCrossJoinException(String[] relations, long currentBroadcastThreshold) { - super(ResultCode.TOO_LARGE_INPUT_FOR_CROSS_JOIN, StringUtils.join(relations), currentBroadcastThreshold + " MB"); + super(ResultCode.TOO_LARGE_INPUT_FOR_CROSS_JOIN, StringUtils.join(relations), currentBroadcastThreshold + " KB"); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java b/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java index d8d5ced..ae61ed8 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java @@ -19,6 +19,7 @@ package org.apache.tajo.util.graph; import org.apache.tajo.annotation.Nullable; +import org.apache.tajo.exception.TajoException; import java.util.List; @@ -60,5 +61,5 @@ public interface DirectedGraph<V, E> extends Graph<V, E> { /** * It visits all vertices in a post-order traverse way. */ - <CONTEXT> void accept(CONTEXT context, V src, DirectedGraphVisitor<CONTEXT, V> visitor); + <CONTEXT> void accept(CONTEXT context, V src, DirectedGraphVisitor<CONTEXT, V> visitor) throws TajoException; } http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java b/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java index 8e0ce87..86f1856 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java @@ -18,8 +18,10 @@ package org.apache.tajo.util.graph; +import org.apache.tajo.exception.TajoException; + import java.util.Stack; public interface DirectedGraphVisitor<CONTEXT, V> { - void visit(CONTEXT context, Stack<V> stack, V v); + void visit(CONTEXT context, Stack<V> stack, V v) throws TajoException; } http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java b/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java index b5e36e7..e1ba137 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java @@ -21,6 +21,8 @@ package org.apache.tajo.util.graph; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.tajo.annotation.Nullable; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.exception.TajoRuntimeException; import org.apache.tajo.util.TUtil; import java.util.*; @@ -219,13 +221,14 @@ public class SimpleDirectedGraph<V, E> implements DirectedGraph<V,E> { } @Override - public <CONTEXT> void accept(CONTEXT context, V source, DirectedGraphVisitor<CONTEXT, V> visitor) { + public <CONTEXT> void accept(CONTEXT context, V source, DirectedGraphVisitor<CONTEXT, V> visitor) + throws TajoException { Stack<V> stack = new Stack<>(); visitRecursive(context, stack, source, visitor); } private <CONTEXT> void visitRecursive(CONTEXT context, Stack<V> stack, V current, - DirectedGraphVisitor<CONTEXT, V> visitor) { + DirectedGraphVisitor<CONTEXT, V> visitor) throws TajoException { stack.push(current); for (V child : getChilds(current)) { visitRecursive(context, stack, child, visitor); @@ -249,7 +252,11 @@ public class SimpleDirectedGraph<V, E> implements DirectedGraph<V,E> { public String toStringGraph(V vertex) { StringBuilder sb = new StringBuilder(); QueryGraphTopologyStringBuilder visitor = new QueryGraphTopologyStringBuilder(); - accept(null, vertex, visitor); + try { + accept(null, vertex, visitor); + } catch (TajoException e) { + throw new TajoRuntimeException(e); + } Stack<DepthString> depthStrings = visitor.getDepthStrings(); while(!depthStrings.isEmpty()) { sb.append(printDepthString(depthStrings.pop())); http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java b/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java index 45cde2a..9ebb69b 100644 --- a/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java +++ b/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java @@ -20,6 +20,7 @@ package org.apache.tajo.util.graph; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tajo.exception.TajoException; import org.apache.tajo.util.graph.DirectedGraphVisitor; import org.apache.tajo.util.graph.SimpleDirectedGraph; import org.junit.Test; @@ -34,7 +35,7 @@ public class TestSimpleDirectedGraph { private static final Log LOG = LogFactory.getLog(TestSimpleDirectedGraph.class); @Test - public final void test() { + public final void test() throws TajoException { SimpleDirectedGraph<String, Integer> graph = new SimpleDirectedGraph<>(); // root http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java index c779d2f..10e9973 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java @@ -99,6 +99,14 @@ public class DataChannel { return shuffleType; } + public boolean isHashShuffle() { + return shuffleType == ShuffleType.HASH_SHUFFLE || shuffleType == ShuffleType.SCATTERED_HASH_SHUFFLE; + } + + public boolean isRangeShuffle() { + return shuffleType == ShuffleType.RANGE_SHUFFLE; + } + public boolean needShuffle() { return shuffleType != ShuffleType.NONE_SHUFFLE; } http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java index 573f5aa..fde05c5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java @@ -16,7 +16,10 @@ package org.apache.tajo.engine.planner.global; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; import java.util.*; @@ -30,19 +33,14 @@ import java.util.*; public class ExecutionBlock { private ExecutionBlockId executionBlockId; private LogicalNode plan = null; - private StoreTableNode store = null; - private List<ScanNode> scanlist = new ArrayList<>(); private Enforcer enforcer = new Enforcer(); - // Actual ScanNode's ExecutionBlockId -> Delegated ScanNode's ExecutionBlockId. private Map<ExecutionBlockId, ExecutionBlockId> unionScanMap = new HashMap<>(); - private boolean hasJoinPlan; - private boolean hasUnionPlan; - private boolean isUnionOnly; - private Map<String, ScanNode> broadcastRelations = new HashMap<>(); + private PlanContext planContext; + /* * An execution block is null-supplying or preserved-row when its output is used as an input for outer join. * These properties are decided based on the type of parent execution block's outer join. @@ -98,52 +96,16 @@ public class ExecutionBlock { return executionBlockId; } - public void setPlan(LogicalNode plan) { - hasJoinPlan = false; - hasUnionPlan = false; - isUnionOnly = true; - this.scanlist.clear(); + public void setPlan(LogicalNode plan) throws TajoException { this.plan = plan; if (plan == null) { return; } - LogicalNode node = plan; - ArrayList<LogicalNode> s = new ArrayList<>(); - s.add(node); - while (!s.isEmpty()) { - node = s.remove(s.size()-1); - // TODO: the below code should be improved to handle every case - if (isUnionOnly && node.getType() != NodeType.ROOT && node.getType() != NodeType.TABLE_SUBQUERY && - node.getType() != NodeType.SCAN && node.getType() != NodeType.PARTITIONS_SCAN && - node.getType() != NodeType.UNION && node.getType() != NodeType.PROJECTION) { - isUnionOnly = false; - } - if (node instanceof UnaryNode) { - UnaryNode unary = (UnaryNode) node; - s.add(s.size(), unary.getChild()); - } else if (node instanceof BinaryNode) { - BinaryNode binary = (BinaryNode) node; - if (binary.getType() == NodeType.JOIN) { - hasJoinPlan = true; - } else if (binary.getType() == NodeType.UNION) { - hasUnionPlan = true; - } - s.add(s.size(), binary.getLeftChild()); - s.add(s.size(), binary.getRightChild()); - } else if (node instanceof ScanNode) { - scanlist.add((ScanNode)node); - } else if (node instanceof TableSubQueryNode) { - TableSubQueryNode subQuery = (TableSubQueryNode) node; - s.add(s.size(), subQuery.getSubQuery()); - } else if (node instanceof StoreTableNode) { - store = (StoreTableNode)node; - } - } - if (!hasUnionPlan) { - isUnionOnly = false; - } + final PlanVisitor visitor = new PlanVisitor(); + planContext = new PlanContext(); + visitor.visit(planContext, null, null, plan, new Stack<>()); } public void addUnionScan(ExecutionBlockId realScanEbId, ExecutionBlockId delegatedScanEbId) { @@ -163,12 +125,12 @@ public class ExecutionBlock { } public StoreTableNode getStoreTableNode() { - return store; + return planContext.store; } public int getNonBroadcastRelNum() { int nonBroadcastRelNum = 0; - for (ScanNode scanNode : scanlist) { + for (ScanNode scanNode : planContext.scanlist) { if (!broadcastRelations.containsKey(scanNode.getCanonicalName())) { nonBroadcastRelNum++; } @@ -177,19 +139,23 @@ public class ExecutionBlock { } public ScanNode [] getScanNodes() { - return this.scanlist.toArray(new ScanNode[scanlist.size()]); + return planContext.scanlist.toArray(new ScanNode[planContext.scanlist.size()]); } public boolean hasJoin() { - return hasJoinPlan; + return planContext.hasJoinPlan; } public boolean hasUnion() { - return hasUnionPlan; + return planContext.hasUnionPlan; + } + + public boolean hasAgg() { + return planContext.hasAggPlan; } public boolean isUnionOnly() { - return isUnionOnly; + return planContext.isUnionOnly(); } public void addBroadcastRelation(ScanNode relationNode) { @@ -235,4 +201,93 @@ public class ExecutionBlock { public boolean isPreservedRow() { return preservedRow; } + + private class PlanContext { + StoreTableNode store = null; + + List<ScanNode> scanlist = new ArrayList<>(); + + boolean hasJoinPlan = false; + boolean hasUnionPlan = false; + boolean hasAggPlan = false; + boolean hasSortPlan = false; + + boolean isUnionOnly() { + return hasUnionPlan && !hasJoinPlan && !hasAggPlan && !hasSortPlan; + } + } + + private class PlanVisitor extends BasicLogicalPlanVisitor<PlanContext, LogicalNode> { + + @Override + public LogicalNode visitJoin(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode node, + Stack<LogicalNode> stack) throws TajoException { + context.hasJoinPlan = true; + return super.visitJoin(context, plan, block, node, stack); + } + + @Override + public LogicalNode visitGroupBy(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, GroupbyNode node, + Stack<LogicalNode> stack) throws TajoException { + context.hasAggPlan = true; + return super.visitGroupBy(context, plan, block, node, stack); + } + + @Override + public LogicalNode visitWindowAgg(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, WindowAggNode node, + Stack<LogicalNode> stack) throws TajoException { + context.hasAggPlan = true; + return super.visitWindowAgg(context, plan, block, node, stack); + } + + @Override + public LogicalNode visitDistinctGroupby(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + DistinctGroupbyNode node, Stack<LogicalNode> stack) throws TajoException { + context.hasAggPlan = true; + return super.visitDistinctGroupby(context, plan, block, node, stack); + } + + @Override + public LogicalNode visitSort(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, SortNode node, + Stack<LogicalNode> stack) throws TajoException { + context.hasSortPlan = true; + return super.visitSort(context, plan, block, node, stack); + } + + @Override + public LogicalNode visitUnion(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, UnionNode node, + Stack<LogicalNode> stack) throws TajoException { + context.hasUnionPlan = true; + return super.visitUnion(context, plan, block, node, stack); + } + + @Override + public LogicalNode visitStoreTable(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, StoreTableNode node, + Stack<LogicalNode> stack) throws TajoException { + context.store = node; + return super.visitStoreTable(context, plan, block, node, stack); + } + + @Override + public LogicalNode visitScan(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node, + Stack<LogicalNode> stack) throws TajoException { + context.scanlist.add(node); + return super.visitScan(context, plan, block, node, stack); + } + + @Override + public LogicalNode visitPartitionedTableScan(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + PartitionedTableScanNode node, Stack<LogicalNode> stack) + throws TajoException { + context.scanlist.add(node); + return super.visitPartitionedTableScan(context, plan, block, node, stack); + } + + @Override + public LogicalNode visitIndexScan(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, IndexScanNode node, + Stack<LogicalNode> stack) throws TajoException { + context.scanlist.add(node); + return super.visitIndexScan(context, plan, block, node, stack); + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java index bf41d5b..463d015 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java @@ -199,7 +199,7 @@ public class GlobalPlanner { } private ExecutionBlock buildJoinPlan(GlobalPlanContext context, JoinNode joinNode, - ExecutionBlock leftBlock, ExecutionBlock rightBlock) { + ExecutionBlock leftBlock, ExecutionBlock rightBlock) throws TajoException { MasterPlan masterPlan = context.plan; ExecutionBlock currentBlock; @@ -596,7 +596,7 @@ public class GlobalPlanner { } private ExecutionBlock buildGroupBy(GlobalPlanContext context, ExecutionBlock lastBlock, - GroupbyNode groupbyNode) { + GroupbyNode groupbyNode) throws TajoException { MasterPlan masterPlan = context.plan; ExecutionBlock currentBlock; @@ -675,7 +675,7 @@ public class GlobalPlanner { } private ExecutionBlock buildGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock, - GroupbyNode firstPhaseGroupBy, GroupbyNode secondPhaseGroupBy) { + GroupbyNode firstPhaseGroupBy, GroupbyNode secondPhaseGroupBy) throws TajoException { DataChannel lastDataChannel = null; // It pushes down the first phase group-by operator into all child blocks. @@ -715,7 +715,7 @@ public class GlobalPlanner { } private ExecutionBlock buildTwoPhaseGroupby(MasterPlan masterPlan, ExecutionBlock latestBlock, - GroupbyNode firstPhaseGroupby, GroupbyNode secondPhaseGroupby) { + GroupbyNode firstPhaseGroupby, GroupbyNode secondPhaseGroupby) throws TajoException { ExecutionBlock childBlock = latestBlock; childBlock.setPlan(firstPhaseGroupby); @@ -780,7 +780,7 @@ public class GlobalPlanner { return firstPhaseGroupBy; } - private ExecutionBlock buildSortPlan(GlobalPlanContext context, ExecutionBlock childBlock, SortNode currentNode) { + private ExecutionBlock buildSortPlan(GlobalPlanContext context, ExecutionBlock childBlock, SortNode currentNode) throws TajoException { MasterPlan masterPlan = context.plan; ExecutionBlock currentBlock; @@ -860,7 +860,7 @@ public class GlobalPlanner { */ private ExecutionBlock buildShuffleAndStorePlanNoPartitionedTableWithUnion(GlobalPlanContext context, StoreTableNode currentNode, - ExecutionBlock childBlock) { + ExecutionBlock childBlock) throws TajoException { for (ExecutionBlock grandChildBlock : context.plan.getChilds(childBlock)) { StoreTableNode copy = PlannerUtil.clone(context.plan.getLogicalPlan(), currentNode); copy.setChild(grandChildBlock.getPlan()); @@ -875,7 +875,7 @@ public class GlobalPlanner { */ private ExecutionBlock buildShuffleAndStorePlanToPartitionedTableWithUnion(GlobalPlanContext context, StoreTableNode currentNode, - ExecutionBlock lastBlock) { + ExecutionBlock lastBlock) throws TajoException { MasterPlan masterPlan = context.plan; DataChannel lastChannel = null; @@ -899,7 +899,7 @@ public class GlobalPlanner { */ private ExecutionBlock buildShuffleAndStorePlanToPartitionedTable(GlobalPlanContext context, StoreTableNode currentNode, - ExecutionBlock lastBlock) { + ExecutionBlock lastBlock) throws TajoException { MasterPlan masterPlan = context.plan; ExecutionBlock nextBlock = masterPlan.newExecutionBlock(); @@ -920,7 +920,7 @@ public class GlobalPlanner { private ExecutionBlock buildNoPartitionedStorePlan(GlobalPlanContext context, StoreTableNode currentNode, - ExecutionBlock childBlock) { + ExecutionBlock childBlock) throws TajoException { if (hasUnionChild(currentNode)) { // when the below is union return buildShuffleAndStorePlanNoPartitionedTableWithUnion(context, currentNode, childBlock); } else { @@ -1240,7 +1240,8 @@ public class GlobalPlanner { return node; } - private LogicalNode handleUnaryNode(GlobalPlanContext context, LogicalNode child, LogicalNode node) { + private LogicalNode handleUnaryNode(GlobalPlanContext context, LogicalNode child, LogicalNode node) + throws TajoException { ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID()); execBlock.setPlan(node); context.execBlockMap.put(node.getPID(), execBlock); http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java index feaba76..a7b03e7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java @@ -23,8 +23,10 @@ package org.apache.tajo.engine.planner.global; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryId; +import org.apache.tajo.annotation.NotNull; import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.TajoException; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; import org.apache.tajo.plan.serder.PlanProto.ShuffleType; @@ -47,6 +49,44 @@ public class MasterPlan { private SimpleDirectedGraph<ExecutionBlockId, DataChannel> execBlockGraph = new SimpleDirectedGraph<>(); + private Map<ExecutionBlockId, ShuffleContext> shuffleInfo = new HashMap<>(); + + /** + * + */ + public class ShuffleContext { + ExecutionBlockId parentEbId; + int partitionNum; + + public ShuffleContext(ExecutionBlockId parentEbId, int partitionNum) { + this.parentEbId = parentEbId; + this.partitionNum = partitionNum; + } + + public ExecutionBlockId getParentEbId() { + return parentEbId; + } + + public int getPartitionNum() { + return partitionNum; + } + } + + /** + * + * @param ebId + * @param partitionNum + */ + public void addShuffleInfo(ExecutionBlockId ebId, int partitionNum) { + ExecutionBlockId parentId = getParent(getExecBlock(ebId)).getId(); + shuffleInfo.put(parentId, new ShuffleContext(ebId, partitionNum)); + } + + public Optional<ShuffleContext> getShuffleInfo(ExecutionBlockId ebId) { + ExecutionBlockId parentId = getParent(getExecBlock(ebId)).getId(); + return shuffleInfo.containsKey(parentId) ? Optional.of(shuffleInfo.get(parentId)) : Optional.empty(); + } + public ExecutionBlockId newExecutionBlockId() { return new ExecutionBlockId(queryId, nextId.incrementAndGet()); } @@ -215,14 +255,14 @@ public class MasterPlan { return getChild(executionBlock.getId(), idx); } - public <CONTEXT> void accept(CONTEXT context, ExecutionBlockId v, DirectedGraphVisitor<CONTEXT, ExecutionBlockId> visitor) { + public <CONTEXT> void accept(CONTEXT context, ExecutionBlockId v, DirectedGraphVisitor<CONTEXT, + ExecutionBlockId> visitor) throws TajoException { execBlockGraph.accept(context, v, visitor); } @Override public String toString() { StringBuilder sb = new StringBuilder(); - ExecutionBlockCursor cursor = new ExecutionBlockCursor(this); sb.append("-------------------------------------------------------------------------------\n"); sb.append("Execution Block Graph (TERMINAL - " + getTerminalBlock() + ")\n"); sb.append("-------------------------------------------------------------------------------\n"); @@ -285,12 +325,7 @@ public class MasterPlan { sb.append("\n[Enforcers]\n"); int i = 0; List<EnforceProperty> enforceProperties = block.getEnforcer().getProperties(); - Collections.sort(enforceProperties, new Comparator<EnforceProperty>() { - @Override - public int compare(EnforceProperty o1, EnforceProperty o2) { - return o1.toString().compareTo(o2.toString()); - } - }); + Collections.sort(enforceProperties, (e1, e2) -> e1.toString().compareTo(e2.toString())); for (EnforceProperty enforce : enforceProperties) { sb.append(" ").append(i++).append(": "); sb.append(Enforcer.toString(enforce)); http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java index 592ea2b..8f7673b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java @@ -30,6 +30,7 @@ import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.planner.global.GlobalPlanner.GlobalPlanContext; import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.exception.TajoException; import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm; import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage; @@ -711,7 +712,7 @@ public class DistinctGroupbyBuilder { private ExecutionBlock buildDistinctGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock, DistinctGroupbyNode firstPhaseGroupBy, - DistinctGroupbyNode secondPhaseGroupBy) { + DistinctGroupbyNode secondPhaseGroupBy) throws TajoException { DataChannel lastDataChannel = null; // It pushes down the first phase group-by operator into all child blocks. http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java index a19704b..d390740 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java @@ -175,7 +175,8 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { } @Override - public void visit(Context context, Stack<ExecutionBlockId> stack, ExecutionBlockId executionBlockId) { + public void visit(Context context, Stack<ExecutionBlockId> stack, ExecutionBlockId executionBlockId) + throws TajoException { ExecutionBlock current = plan.getExecBlock(executionBlockId); if (plan.isLeaf(current)) { @@ -209,7 +210,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { * * @param current */ - private void visitNonLeafNode(Context context, ExecutionBlock current) { + private void visitNonLeafNode(Context context, ExecutionBlock current) throws TajoException { // At non-leaf execution blocks, merge broadcastable children's plan with the current plan. if (!plan.isTerminal(current)) { @@ -423,7 +424,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { * @param parent parent block who has join nodes * @return */ - private ExecutionBlock mergeTwoPhaseJoinIfPossible(MasterPlan plan, ExecutionBlock child, ExecutionBlock parent) { + private ExecutionBlock mergeTwoPhaseJoinIfPossible(MasterPlan plan, ExecutionBlock child, ExecutionBlock parent) throws TajoException { ScanNode scanForChild = findScanForChildEb(child, parent); parentFinder.set(scanForChild); @@ -446,7 +447,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { } private void addUnionNodeIfNecessary(Map<ExecutionBlockId, ExecutionBlockId> unionScanMap, MasterPlan plan, - ExecutionBlock child, ExecutionBlock current) { + ExecutionBlock child, ExecutionBlock current) throws TajoException { if (unionScanMap != null) { List<ExecutionBlockId> unionScans = new ArrayList<>(); ExecutionBlockId representativeId = null; http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java index 0e89928..ff629c3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java @@ -85,7 +85,7 @@ public class ExternalSortExec extends SortExec { /** the defaultFanout of external sort */ private final int defaultFanout; /** It's the size of in-memory table. If memory consumption exceeds it, store the memory table into a disk. */ - private int sortBufferBytesNum; + private final long sortBufferBytesNum; /** the number of available cores */ private final int allocatedCoreNum; /** If there are available multiple cores, it tries parallel merge. */ http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java index ec7ed2d..4e4251a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java @@ -50,7 +50,6 @@ import org.apache.tajo.plan.logical.SortNode.SortPurpose; import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage; import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.pullserver.TajoPullServerService; import org.apache.tajo.querymaster.Task.IntermediateEntry; import org.apache.tajo.querymaster.Task.PullHost; import org.apache.tajo.storage.*; @@ -321,12 +320,12 @@ public class Repartitioner { if (tbNameToInterm.containsKey(scanEbId)) { tbNameToInterm.get(scanEbId).add(intermediateEntry); } else { - tbNameToInterm.put(scanEbId, new ArrayList<>(Arrays.asList(intermediateEntry))); + tbNameToInterm.put(scanEbId, Lists.newArrayList(intermediateEntry)); } } else { Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm = new HashMap<>(); - tbNameToInterm.put(scanEbId, new ArrayList<>(Arrays.asList(intermediateEntry))); + tbNameToInterm.put(scanEbId, Lists.newArrayList(intermediateEntry)); hashEntries.put(intermediateEntry.getPartId(), tbNameToInterm); } } @@ -606,10 +605,9 @@ public class Repartitioner { MasterPlan masterPlan, Stage stage, int maxNum) throws IOException { DataChannel channel = masterPlan.getIncomingChannels(stage.getBlock().getId()).get(0); - if (channel.getShuffleType() == HASH_SHUFFLE - || channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) { + if (channel.isHashShuffle()) { scheduleHashShuffledFetches(schedulerContext, masterPlan, stage, channel, maxNum); - } else if (channel.getShuffleType() == RANGE_SHUFFLE) { + } else if (channel.isRangeShuffle()) { scheduleRangeShuffledFetches(schedulerContext, masterPlan, stage, channel, maxNum); } else { throw new TajoInternalError("Cannot support partition type"); @@ -698,10 +696,8 @@ public class Repartitioner { TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges); if (LOG.isDebugEnabled()) { - if (ranges != null) { - for (TupleRange eachRange : ranges) { - LOG.debug(stage.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd()); - } + for (TupleRange eachRange : ranges) { + LOG.debug(stage.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd()); } } } @@ -999,7 +995,7 @@ public class Repartitioner { int partId = eachInterm.getPartId(); List<IntermediateEntry> partitionInterms = partitionIntermMap.get(partId); if (partitionInterms == null) { - partitionInterms = Arrays.asList(eachInterm); + partitionInterms = Lists.newArrayList(eachInterm); partitionIntermMap.put(partId, partitionInterms); } else { partitionInterms.add(eachInterm); @@ -1078,7 +1074,7 @@ public class Repartitioner { fetchListVolume = 0; } FetchImpl fetch = new FetchImpl(fetchName, currentInterm.getPullHost(), SCATTERED_HASH_SHUFFLE, - ebId, currentInterm.getPartId(), Arrays.asList(currentInterm)); + ebId, currentInterm.getPartId(), Lists.newArrayList(currentInterm)); fetch.setOffset(eachSplit.getFirst()); fetch.setLength(eachSplit.getSecond()); fetchListForSingleTask.add(fetch.getProto()); @@ -1219,7 +1215,7 @@ public class Repartitioner { if (hashed.containsKey(entry.getPartId())) { hashed.get(entry.getPartId()).add(entry); } else { - hashed.put(entry.getPartId(), Arrays.asList(entry)); + hashed.put(entry.getPartId(), Lists.newArrayList(entry)); } } @@ -1235,7 +1231,7 @@ public class Repartitioner { if (hashed.containsKey(host)) { hashed.get(host).add(entry); } else { - hashed.put(host, Arrays.asList(entry)); + hashed.put(host, Lists.newArrayList(entry)); } } @@ -1258,12 +1254,12 @@ public class Repartitioner { } // set the partition number for group by and sort - if (channel.getShuffleType() == HASH_SHUFFLE) { + if (channel.isHashShuffle()) { if (execBlock.getPlan().getType() == NodeType.GROUP_BY || execBlock.getPlan().getType() == NodeType.DISTINCT_GROUP_BY) { keys = channel.getShuffleKeys(); } - } else if (channel.getShuffleType() == RANGE_SHUFFLE) { + } else if (channel.isRangeShuffle()) { if (execBlock.getPlan().getType() == NodeType.SORT) { SortNode sort = (SortNode) execBlock.getPlan(); keys = new Column[sort.getSortKeys().length]; @@ -1278,6 +1274,7 @@ public class Repartitioner { channel.setShuffleOutputNum(1); } else { channel.setShuffleKeys(keys); + // NOTE: desiredNum is not used in Sort anymore. channel.setShuffleOutputNum(desiredNum); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index f1813c9..08ff184 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -42,14 +42,17 @@ import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.engine.planner.global.MasterPlan.ShuffleContext; import org.apache.tajo.error.Errors.SerializedException; import org.apache.tajo.exception.ErrorUtil; import org.apache.tajo.exception.TajoException; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.TaskState; import org.apache.tajo.master.event.*; import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage; import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; import org.apache.tajo.plan.util.PlannerUtil; @@ -855,6 +858,7 @@ public class Stage implements EventHandler<StageEvent> { ExecutionBlock parent = stage.getMasterPlan().getParent(stage.getBlock()); DataChannel channel = stage.getMasterPlan().getChannel(stage.getId(), parent.getId()); setShuffleIfNecessary(stage, channel); + // TODO: verify changed shuffle plan initTaskScheduler(stage); // execute pre-processing asyncronously stage.getContext().getQueryMasterContext().getSingleEventExecutor() @@ -920,8 +924,8 @@ public class Stage implements EventHandler<StageEvent> { * methods and the number of partitions to a given Stage. */ private static void setShuffleIfNecessary(Stage stage, DataChannel channel) { - if (channel.getShuffleType() != ShuffleType.NONE_SHUFFLE) { - int numTasks = calculateShuffleOutputNum(stage, channel); + if (channel.isHashShuffle()) { + int numTasks = calculateShuffleOutputNum(stage); Repartitioner.setShuffleOutputNumForTwoPhase(stage, numTasks, channel); } } @@ -933,122 +937,154 @@ public class Stage implements EventHandler<StageEvent> { * @param stage * @return */ - public static int calculateShuffleOutputNum(Stage stage, DataChannel channel) { + public static int calculateShuffleOutputNum(Stage stage) { MasterPlan masterPlan = stage.getMasterPlan(); - ExecutionBlock parent = masterPlan.getParent(stage.getBlock()); - LogicalNode grpNode = null; - if (parent != null) { - grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.GROUP_BY); - if (grpNode == null) { - grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.DISTINCT_GROUP_BY); + // For test + if (masterPlan.getContext().containsKey(SessionVars.TEST_MIN_TASK_NUM)) { + int partitionNum = masterPlan.getContext().getInt(SessionVars.TEST_MIN_TASK_NUM); + LOG.info(stage.getId() + ", The determined number of partitions is " + partitionNum + " for test"); + return partitionNum; + } + + Optional<ShuffleContext> optional = masterPlan.getShuffleInfo(stage.getId()); + if (optional.isPresent()) { + LOG.info("# of partitions is determined as " + optional.get().getPartitionNum() + + "to match with sibling eb's partition number"); + return optional.get().getPartitionNum(); + + } else { + ExecutionBlock parent = masterPlan.getParent(stage.getBlock()); + int partitionNum; + + if (parent != null) { + // We assume this execution block the first stage of join if two or more tables are included in this block, + if (parent.hasJoin()) { + if (parent.getNonBroadcastRelNum() > 1) { + // repartition join + partitionNum = calculatePartitionNumForRepartitionJoin(parent, stage); + LOG.info(stage.getId() + ", The determined number of partitions for repartition join is " + partitionNum); + } else { + // broadcast join + // partition number is calculated using the volume of the large table + partitionNum = calculatePartitionNumDefault(parent, stage); + LOG.info(stage.getId() + ", The determined number of partitions for broadcast join is " + partitionNum); + } + + } else { + // Is this stage the first step of group-by? + if (parent.hasAgg()) { + LogicalNode grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.GROUP_BY, + NodeType.DISTINCT_GROUP_BY, NodeType.WINDOW_AGG); + if (grpNode == null) { + throw new TajoInternalError("Cannot find aggregation plan for " + stage.getId()); + } + + if (!hasGroupKeys(stage, grpNode)) { + LOG.info(stage.getId() + ", No Grouping Column - determinedTaskNum is set to 1"); + partitionNum = 1; + } else { + partitionNum = calculatePartitionNumForAgg(parent, stage); + LOG.info(stage.getId() + ", The determined number of aggregation partitions is " + partitionNum); + } + + } else { + // NOTE: the below code might be executed during sort, but the partition number is not used anymore for sort. + LOG.info("============>>>>> Unexpected Case! <<<<<================"); + partitionNum = calculatePartitionNumDefault(parent, stage); + LOG.info(stage.getId() + ", The determined number of partitions is " + partitionNum); + } + + } + } else { + // This case means that the parent eb does not exist even though data shuffle is required after the current eb. + throw new TajoInternalError("Cannot find parent execution block of " + stage.block.getId()); } + + // Record the partition number for sibling execution blocks + masterPlan.addShuffleInfo(stage.getId(), partitionNum); + return partitionNum; } + } + + private static int calculatePartitionNumForRepartitionJoin(ExecutionBlock parent, Stage currentStage) { + List<ExecutionBlock> childs = currentStage.masterPlan.getChilds(parent); - // We assume this execution block the first stage of join if two or more tables are included in this block, - if (parent != null && (parent.getNonBroadcastRelNum()) >= 2) { - List<ExecutionBlock> childs = masterPlan.getChilds(parent); + // for outer + ExecutionBlock outer = childs.get(0); + long outerVolume = getInputVolume(currentStage.masterPlan, currentStage.context, outer); - // for outer - ExecutionBlock outer = childs.get(0); - long outerVolume = getInputVolume(stage.masterPlan, stage.context, outer); + // for inner + ExecutionBlock inner = childs.get(1); + long innerVolume = getInputVolume(currentStage.masterPlan, currentStage.context, inner); + LOG.info(currentStage.getId() + ", Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, " + + "Inner volume: " + Math.ceil((double) innerVolume / 1048576) + "MB"); - // for inner - ExecutionBlock inner = childs.get(1); - long innerVolume = getInputVolume(stage.masterPlan, stage.context, inner); - LOG.info(stage.getId() + ", Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, " - + "Inner volume: " + Math.ceil((double) innerVolume / 1048576) + "MB"); + long bigger = Math.max(outerVolume, innerVolume); - long bigger = Math.max(outerVolume, innerVolume); + int mb = (int) Math.ceil((double) bigger / 1048576); + LOG.info(currentStage.getId() + ", Bigger Table's volume is approximately " + mb + " MB"); - int mb = (int) Math.ceil((double) bigger / 1048576); - LOG.info(stage.getId() + ", Bigger Table's volume is approximately " + mb + " MB"); + return (int) Math.ceil((double) mb / + currentStage.masterPlan.getContext().getInt(SessionVars.JOIN_PER_SHUFFLE_SIZE)); + } - int taskNum = (int) Math.ceil((double) mb / masterPlan.getContext().getInt(SessionVars.JOIN_PER_SHUFFLE_SIZE)); + private static int calculatePartitionNumForAgg(ExecutionBlock parent, Stage stage) { + int volumeByMB = getInputVolumeMB(parent, stage); + LOG.info(stage.getId() + ", Table's volume is approximately " + volumeByMB + " MB"); + // determine the number of task + return (int) Math.ceil((double) volumeByMB / + stage.masterPlan.getContext().getInt(SessionVars.GROUPBY_PER_SHUFFLE_SIZE)); - if (masterPlan.getContext().containsKey(SessionVars.TEST_MIN_TASK_NUM)) { - taskNum = masterPlan.getContext().getInt(SessionVars.TEST_MIN_TASK_NUM); - LOG.warn("!!!!! TESTCASE MODE !!!!!"); - } + } - // The shuffle output numbers of join may be inconsistent by execution block order. - // Thus, we need to compare the number with DataChannel output numbers. - // If the number is right, the number and DataChannel output numbers will be consistent. - int outerShuffleOutputNum = 0, innerShuffleOutputNum = 0; - for (DataChannel eachChannel : masterPlan.getOutgoingChannels(outer.getId())) { - outerShuffleOutputNum = Math.max(outerShuffleOutputNum, eachChannel.getShuffleOutputNum()); - } - for (DataChannel eachChannel : masterPlan.getOutgoingChannels(inner.getId())) { - innerShuffleOutputNum = Math.max(innerShuffleOutputNum, eachChannel.getShuffleOutputNum()); + private static boolean hasGroupKeys(Stage currentStage, LogicalNode aggNode) { + if (aggNode.getType() == NodeType.GROUP_BY) { + return ((GroupbyNode)aggNode).getGroupingColumns().length > 0; + } else if (aggNode.getType() == NodeType.DISTINCT_GROUP_BY) { + // Find current distinct stage node. + DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(currentStage.getBlock().getPlan(), + NodeType.DISTINCT_GROUP_BY); + if (distinctNode == null) { + LOG.warn(currentStage.getId() + ", Can't find current DistinctGroupbyNode"); + distinctNode = (DistinctGroupbyNode)aggNode; } - if (outerShuffleOutputNum != innerShuffleOutputNum - && taskNum != outerShuffleOutputNum - && taskNum != innerShuffleOutputNum) { - LOG.info(stage.getId() + ", Change determined number of join partitions cause difference of outputNum" + - ", originTaskNum=" + taskNum + ", changedTaskNum=" + Math.max(outerShuffleOutputNum, innerShuffleOutputNum) + - ", outerShuffleOutptNum=" + outerShuffleOutputNum + - ", innerShuffleOutputNum=" + innerShuffleOutputNum); - taskNum = Math.max(outerShuffleOutputNum, innerShuffleOutputNum); - } - - LOG.info(stage.getId() + ", The determined number of join partitions is " + taskNum); - - return taskNum; - // Is this stage the first step of group-by? - } else if (grpNode != null) { - boolean hasGroupColumns = true; - if (grpNode.getType() == NodeType.GROUP_BY) { - hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0; - } else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) { - // Find current distinct stage node. - DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(stage.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY); - if (distinctNode == null) { - LOG.warn(stage.getId() + ", Can't find current DistinctGroupbyNode"); - distinctNode = (DistinctGroupbyNode)grpNode; - } - hasGroupColumns = distinctNode.getGroupingColumns().length > 0; + boolean hasGroupColumns = distinctNode.getGroupingColumns().length > 0; - Enforcer enforcer = stage.getBlock().getEnforcer(); - if (enforcer == null) { - LOG.warn(stage.getId() + ", DistinctGroupbyNode's enforcer is null."); - } + Enforcer enforcer = currentStage.getBlock().getEnforcer(); + if (enforcer == null) { + LOG.warn(currentStage.getId() + ", DistinctGroupbyNode's enforcer is null."); + } else { EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode); if (property != null) { if (property.getDistinct().getIsMultipleAggregation()) { MultipleAggregationStage multiAggStage = property.getDistinct().getMultipleAggregationStage(); - if (multiAggStage != MultipleAggregationStage.THRID_STAGE) { - hasGroupColumns = true; - } + hasGroupColumns = multiAggStage != MultipleAggregationStage.THRID_STAGE; } } } - if (!hasGroupColumns) { - LOG.info(stage.getId() + ", No Grouping Column - determinedTaskNum is set to 1"); - return 1; - } else { - long volume = getInputVolume(stage.masterPlan, stage.context, stage.block); - - int volumeByMB = (int) Math.ceil((double) volume / StorageUnit.MB); - LOG.info(stage.getId() + ", Table's volume is approximately " + volumeByMB + " MB"); - // determine the number of task - int taskNum = (int) Math.ceil((double) volumeByMB / - masterPlan.getContext().getInt(SessionVars.GROUPBY_PER_SHUFFLE_SIZE)); - LOG.info(stage.getId() + ", The determined number of aggregation partitions is " + taskNum); - return taskNum; - } + return hasGroupColumns; } else { - LOG.info("============>>>>> Unexpected Case! <<<<<================"); - long volume = getInputVolume(stage.masterPlan, stage.context, stage.block); - - int mb = (int) Math.ceil((double)volume / 1048576); - LOG.info(stage.getId() + ", Table's volume is approximately " + mb + " MB"); - // determine the number of task per 128MB - int taskNum = (int) Math.ceil((double)mb / 128); - LOG.info(stage.getId() + ", The determined number of partitions is " + taskNum); - return taskNum; + return ((WindowAggNode) aggNode).hasPartitionKeys(); } } + private static int calculatePartitionNumDefault(ExecutionBlock parent, Stage currentStage) { + int mb = getInputVolumeMB(parent, currentStage); + LOG.info(currentStage.getId() + ", Table's volume is approximately " + mb + " MB"); + // determine the number of task per 128 MB + return (int) Math.ceil((double)mb / 128); + } + + private static int getInputVolumeMB(ExecutionBlock parent, Stage currentStage) { + // NOTE: Get input volume from the parent EB. + // If the parent EB contains an UNION query, the volume of the whole input for the UNION is returned. + // Otherwise, only the input volume of the current EB is returned. + long volume = getInputVolume(currentStage.masterPlan, currentStage.context, parent); + + return (int) Math.ceil((double)volume / StorageUnit.MB); + } + private static void schedule(Stage stage) throws IOException, TajoException { MasterPlan masterPlan = stage.getMasterPlan(); ExecutionBlock execBlock = stage.getBlock(); http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java index 48affc5..59adfc5 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java @@ -398,7 +398,7 @@ public class SelfDescSchemaBuildPhase extends LogicalPlanPreprocessPhase { * @param columns a set of columns * @return schema build from columns */ - private Schema buildSchemaFromColumnSet(Set<Column> columns) { + private Schema buildSchemaFromColumnSet(Set<Column> columns) throws TajoException { SchemaGraph schemaGraph = new SchemaGraph(); Set<ColumnVertex> rootVertexes = new HashSet<>(); Schema schema = new Schema(); http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java index 239becc..6897e17 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java @@ -414,7 +414,7 @@ public class PlannerUtil { * @param type to find * @return a found logical node */ - public static <T extends LogicalNode> T findMostBottomNode(LogicalNode node, NodeType type) { + public static <T extends LogicalNode> T findMostBottomNode(LogicalNode node, NodeType... type) { Preconditions.checkNotNull(node); Preconditions.checkNotNull(type);
