Repository: tajo Updated Branches: refs/heads/branch-0.11.2 f6417f4ea -> 3e083b164
TAJO-2082: Aggregation on a derived table which includes union can cause incorrect result. Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/3e083b16 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/3e083b16 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/3e083b16 Branch: refs/heads/branch-0.11.2 Commit: 3e083b16413ba719767cb2ff5fc96567c53550d8 Parents: f6417f4 Author: Jihoon Son <[email protected]> Authored: Fri Mar 4 16:14:02 2016 +0900 Committer: Jihoon Son <[email protected]> Committed: Fri Mar 4 16:14:02 2016 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../TooLargeInputForCrossJoinException.java | 2 +- .../apache/tajo/util/graph/DirectedGraph.java | 3 +- .../tajo/util/graph/DirectedGraphVisitor.java | 4 +- .../tajo/util/graph/SimpleDirectedGraph.java | 12 +- .../util/graph/TestSimpleDirectedGraph.java | 3 +- .../tajo/engine/planner/global/DataChannel.java | 8 + .../engine/planner/global/ExecutionBlock.java | 160 ++++++++----- .../engine/planner/global/GlobalPlanner.java | 26 ++- .../tajo/engine/planner/global/MasterPlan.java | 45 +++- .../global/builder/DistinctGroupbyBuilder.java | 3 +- .../rewriter/rules/BroadcastJoinRule.java | 9 +- .../planner/physical/ExternalSortExec.java | 2 +- .../apache/tajo/querymaster/Repartitioner.java | 9 +- .../java/org/apache/tajo/querymaster/Stage.java | 224 +++++++++++-------- .../plan/rewrite/SelfDescSchemaBuildPhase.java | 2 +- .../org/apache/tajo/plan/util/PlannerUtil.java | 2 +- 17 files changed, 339 insertions(+), 178 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/3e083b16/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 22eb057..1095644 100644 --- a/CHANGES +++ b/CHANGES @@ -20,6 +20,9 @@ Release 0.11.2 - unreleased BUG FIXES + TAJO-2082: Aggregation on a derived table which includes union can cause + incorrect result. (jihoon) + TAJO-2081: Incorrect task locality on single node. (jinho) TAJO-2080: ArrayIndexOutOfBoundsException when performing aggregation on an http://git-wip-us.apache.org/repos/asf/tajo/blob/3e083b16/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java b/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java index 55d5f46..d958cc6 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java @@ -33,6 +33,6 @@ public class TooLargeInputForCrossJoinException extends TajoException { } public TooLargeInputForCrossJoinException(String[] relations, long currentBroadcastThreshold) { - super(ResultCode.TOO_LARGE_INPUT_FOR_CROSS_JOIN, StringUtils.join(relations), currentBroadcastThreshold + " MB"); + super(ResultCode.TOO_LARGE_INPUT_FOR_CROSS_JOIN, StringUtils.join(relations), currentBroadcastThreshold + " KB"); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/3e083b16/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java b/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java index 5433ef5..53757a5 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java @@ -19,6 +19,7 @@ package org.apache.tajo.util.graph; import org.apache.tajo.annotation.Nullable; +import org.apache.tajo.exception.TajoException; import java.util.List; @@ -60,5 +61,5 @@ public interface DirectedGraph<V, E> extends Graph<V, E> { /** * It visits all vertices in a post-order traverse way. */ - void accept(V src, DirectedGraphVisitor<V> visitor); + void accept(V src, DirectedGraphVisitor<V> visitor) throws TajoException; } http://git-wip-us.apache.org/repos/asf/tajo/blob/3e083b16/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java b/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java index 139c2b4..5040634 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java @@ -18,8 +18,10 @@ package org.apache.tajo.util.graph; +import org.apache.tajo.exception.TajoException; + import java.util.Stack; public interface DirectedGraphVisitor<V> { - void visit(Stack<V> stack, V v); + void visit(Stack<V> stack, V v) throws TajoException; } http://git-wip-us.apache.org/repos/asf/tajo/blob/3e083b16/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java b/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java index 045add5..55fe64b 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java @@ -21,6 +21,8 @@ package org.apache.tajo.util.graph; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.tajo.annotation.Nullable; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.exception.TajoRuntimeException; import org.apache.tajo.util.TUtil; import java.util.*; @@ -219,12 +221,12 @@ public class SimpleDirectedGraph<V, E> implements DirectedGraph<V,E> { } @Override - public void accept(V source, DirectedGraphVisitor<V> visitor) { + public void accept(V source, DirectedGraphVisitor<V> visitor) throws TajoException { Stack<V> stack = new Stack<V>(); visitRecursive(stack, source, visitor); } - private void visitRecursive(Stack<V> stack, V current, DirectedGraphVisitor<V> visitor) { + private void visitRecursive(Stack<V> stack, V current, DirectedGraphVisitor<V> visitor) throws TajoException { stack.push(current); for (V child : getChilds(current)) { visitRecursive(stack, child, visitor); @@ -248,7 +250,11 @@ public class SimpleDirectedGraph<V, E> implements DirectedGraph<V,E> { public String toStringGraph(V vertex) { StringBuilder sb = new StringBuilder(); QueryGraphTopologyStringBuilder visitor = new QueryGraphTopologyStringBuilder(); - accept(vertex, visitor); + try { + accept(vertex, visitor); + } catch (TajoException e) { + throw new TajoRuntimeException(e); + } Stack<DepthString> depthStrings = visitor.getDepthStrings(); while(!depthStrings.isEmpty()) { sb.append(printDepthString(depthStrings.pop())); http://git-wip-us.apache.org/repos/asf/tajo/blob/3e083b16/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java b/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java index 676d39f..66da5b8 100644 --- a/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java +++ b/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java @@ -20,6 +20,7 @@ package org.apache.tajo.util.graph; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tajo.exception.TajoException; import org.apache.tajo.util.graph.DirectedGraphVisitor; import org.apache.tajo.util.graph.SimpleDirectedGraph; import org.junit.Test; @@ -34,7 +35,7 @@ public class TestSimpleDirectedGraph { private static final Log LOG = LogFactory.getLog(TestSimpleDirectedGraph.class); @Test - public final void test() { + public final void test() throws TajoException { SimpleDirectedGraph<String, Integer> graph = new SimpleDirectedGraph<String, Integer>(); // root http://git-wip-us.apache.org/repos/asf/tajo/blob/3e083b16/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java index c779d2f..10e9973 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java @@ -99,6 +99,14 @@ public class DataChannel { return shuffleType; } + public boolean isHashShuffle() { + return shuffleType == ShuffleType.HASH_SHUFFLE || shuffleType == ShuffleType.SCATTERED_HASH_SHUFFLE; + } + + public boolean isRangeShuffle() { + return shuffleType == ShuffleType.RANGE_SHUFFLE; + } + public boolean needShuffle() { return shuffleType != ShuffleType.NONE_SHUFFLE; } http://git-wip-us.apache.org/repos/asf/tajo/blob/3e083b16/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java index c71324d..02e6609 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java @@ -16,7 +16,10 @@ package org.apache.tajo.engine.planner.global; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; import org.apache.tajo.util.TUtil; import java.util.*; @@ -31,19 +34,15 @@ import java.util.*; public class ExecutionBlock { private ExecutionBlockId executionBlockId; private LogicalNode plan = null; - private StoreTableNode store = null; - private List<ScanNode> scanlist = new ArrayList<ScanNode>(); private Enforcer enforcer = new Enforcer(); // Actual ScanNode's ExecutionBlockId -> Delegated ScanNode's ExecutionBlockId. private Map<ExecutionBlockId, ExecutionBlockId> unionScanMap = new HashMap<ExecutionBlockId, ExecutionBlockId>(); - private boolean hasJoinPlan; - private boolean hasUnionPlan; - private boolean isUnionOnly; - private Map<String, ScanNode> broadcastRelations = TUtil.newHashMap(); + private PlanContext planContext; + /* * An execution block is null-supplying or preserved-row when its output is used as an input for outer join. * These properties are decided based on the type of parent execution block's outer join. @@ -99,52 +98,16 @@ public class ExecutionBlock { return executionBlockId; } - public void setPlan(LogicalNode plan) { - hasJoinPlan = false; - hasUnionPlan = false; - isUnionOnly = true; - this.scanlist.clear(); + public void setPlan(LogicalNode plan) throws TajoException { this.plan = plan; if (plan == null) { return; } - LogicalNode node = plan; - ArrayList<LogicalNode> s = new ArrayList<LogicalNode>(); - s.add(node); - while (!s.isEmpty()) { - node = s.remove(s.size()-1); - // TODO: the below code should be improved to handle every case - if (isUnionOnly && node.getType() != NodeType.ROOT && node.getType() != NodeType.TABLE_SUBQUERY && - node.getType() != NodeType.SCAN && node.getType() != NodeType.PARTITIONS_SCAN && - node.getType() != NodeType.UNION && node.getType() != NodeType.PROJECTION) { - isUnionOnly = false; - } - if (node instanceof UnaryNode) { - UnaryNode unary = (UnaryNode) node; - s.add(s.size(), unary.getChild()); - } else if (node instanceof BinaryNode) { - BinaryNode binary = (BinaryNode) node; - if (binary.getType() == NodeType.JOIN) { - hasJoinPlan = true; - } else if (binary.getType() == NodeType.UNION) { - hasUnionPlan = true; - } - s.add(s.size(), binary.getLeftChild()); - s.add(s.size(), binary.getRightChild()); - } else if (node instanceof ScanNode) { - scanlist.add((ScanNode)node); - } else if (node instanceof TableSubQueryNode) { - TableSubQueryNode subQuery = (TableSubQueryNode) node; - s.add(s.size(), subQuery.getSubQuery()); - } else if (node instanceof StoreTableNode) { - store = (StoreTableNode)node; - } - } - if (!hasUnionPlan) { - isUnionOnly = false; - } + final PlanVisitor visitor = new PlanVisitor(); + planContext = new PlanContext(); + visitor.visit(planContext, null, null, plan, new Stack<LogicalNode>()); } public void addUnionScan(ExecutionBlockId realScanEbId, ExecutionBlockId delegatedScanEbId) { @@ -164,12 +127,12 @@ public class ExecutionBlock { } public StoreTableNode getStoreTableNode() { - return store; + return planContext.store; } public int getNonBroadcastRelNum() { int nonBroadcastRelNum = 0; - for (ScanNode scanNode : scanlist) { + for (ScanNode scanNode : planContext.scanlist) { if (!broadcastRelations.containsKey(scanNode.getCanonicalName())) { nonBroadcastRelNum++; } @@ -178,19 +141,23 @@ public class ExecutionBlock { } public ScanNode [] getScanNodes() { - return this.scanlist.toArray(new ScanNode[scanlist.size()]); + return planContext.scanlist.toArray(new ScanNode[planContext.scanlist.size()]); } public boolean hasJoin() { - return hasJoinPlan; + return planContext.hasJoinPlan; } public boolean hasUnion() { - return hasUnionPlan; + return planContext.hasUnionPlan; + } + + public boolean hasAgg() { + return planContext.hasAggPlan; } public boolean isUnionOnly() { - return isUnionOnly; + return planContext.isUnionOnly(); } public void addBroadcastRelation(ScanNode relationNode) { @@ -236,4 +203,93 @@ public class ExecutionBlock { public boolean isPreservedRow() { return preservedRow; } + + private class PlanContext { + StoreTableNode store = null; + + List<ScanNode> scanlist = new ArrayList<>(); + + boolean hasJoinPlan = false; + boolean hasUnionPlan = false; + boolean hasAggPlan = false; + boolean hasSortPlan = false; + + boolean isUnionOnly() { + return hasUnionPlan && !hasJoinPlan && !hasAggPlan && !hasSortPlan; + } + } + + private class PlanVisitor extends BasicLogicalPlanVisitor<PlanContext, LogicalNode> { + + @Override + public LogicalNode visitJoin(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode node, + Stack<LogicalNode> stack) throws TajoException { + context.hasJoinPlan = true; + return super.visitJoin(context, plan, block, node, stack); + } + + @Override + public LogicalNode visitGroupBy(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, GroupbyNode node, + Stack<LogicalNode> stack) throws TajoException { + context.hasAggPlan = true; + return super.visitGroupBy(context, plan, block, node, stack); + } + + @Override + public LogicalNode visitWindowAgg(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, WindowAggNode node, + Stack<LogicalNode> stack) throws TajoException { + context.hasAggPlan = true; + return super.visitWindowAgg(context, plan, block, node, stack); + } + + @Override + public LogicalNode visitDistinctGroupby(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + DistinctGroupbyNode node, Stack<LogicalNode> stack) throws TajoException { + context.hasAggPlan = true; + return super.visitDistinctGroupby(context, plan, block, node, stack); + } + + @Override + public LogicalNode visitSort(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, SortNode node, + Stack<LogicalNode> stack) throws TajoException { + context.hasSortPlan = true; + return super.visitSort(context, plan, block, node, stack); + } + + @Override + public LogicalNode visitUnion(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, UnionNode node, + Stack<LogicalNode> stack) throws TajoException { + context.hasUnionPlan = true; + return super.visitUnion(context, plan, block, node, stack); + } + + @Override + public LogicalNode visitStoreTable(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, StoreTableNode node, + Stack<LogicalNode> stack) throws TajoException { + context.store = node; + return super.visitStoreTable(context, plan, block, node, stack); + } + + @Override + public LogicalNode visitScan(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node, + Stack<LogicalNode> stack) throws TajoException { + context.scanlist.add(node); + return super.visitScan(context, plan, block, node, stack); + } + + @Override + public LogicalNode visitPartitionedTableScan(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, + PartitionedTableScanNode node, Stack<LogicalNode> stack) + throws TajoException { + context.scanlist.add(node); + return super.visitPartitionedTableScan(context, plan, block, node, stack); + } + + @Override + public LogicalNode visitIndexScan(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, IndexScanNode node, + Stack<LogicalNode> stack) throws TajoException { + context.scanlist.add(node); + return super.visitIndexScan(context, plan, block, node, stack); + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/3e083b16/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java index 15354fc..82ca6d4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java @@ -199,7 +199,7 @@ public class GlobalPlanner { } private ExecutionBlock buildJoinPlan(GlobalPlanContext context, JoinNode joinNode, - ExecutionBlock leftBlock, ExecutionBlock rightBlock) { + ExecutionBlock leftBlock, ExecutionBlock rightBlock) throws TajoException { MasterPlan masterPlan = context.plan; ExecutionBlock currentBlock; @@ -601,7 +601,7 @@ public class GlobalPlanner { } private ExecutionBlock buildGroupBy(GlobalPlanContext context, ExecutionBlock lastBlock, - GroupbyNode groupbyNode) { + GroupbyNode groupbyNode) throws TajoException { MasterPlan masterPlan = context.plan; ExecutionBlock currentBlock; @@ -680,7 +680,8 @@ public class GlobalPlanner { } private ExecutionBlock buildGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock, - GroupbyNode firstPhaseGroupBy, GroupbyNode secondPhaseGroupBy) { + GroupbyNode firstPhaseGroupBy, GroupbyNode secondPhaseGroupBy) + throws TajoException { DataChannel lastDataChannel = null; // It pushes down the first phase group-by operator into all child blocks. @@ -720,7 +721,8 @@ public class GlobalPlanner { } private ExecutionBlock buildTwoPhaseGroupby(MasterPlan masterPlan, ExecutionBlock latestBlock, - GroupbyNode firstPhaseGroupby, GroupbyNode secondPhaseGroupby) { + GroupbyNode firstPhaseGroupby, GroupbyNode secondPhaseGroupby) + throws TajoException { ExecutionBlock childBlock = latestBlock; childBlock.setPlan(firstPhaseGroupby); @@ -785,7 +787,8 @@ public class GlobalPlanner { return firstPhaseGroupBy; } - private ExecutionBlock buildSortPlan(GlobalPlanContext context, ExecutionBlock childBlock, SortNode currentNode) { + private ExecutionBlock buildSortPlan(GlobalPlanContext context, ExecutionBlock childBlock, SortNode currentNode) + throws TajoException { MasterPlan masterPlan = context.plan; ExecutionBlock currentBlock; @@ -865,7 +868,8 @@ public class GlobalPlanner { */ private ExecutionBlock buildShuffleAndStorePlanNoPartitionedTableWithUnion(GlobalPlanContext context, StoreTableNode currentNode, - ExecutionBlock childBlock) { + ExecutionBlock childBlock) + throws TajoException { for (ExecutionBlock grandChildBlock : context.plan.getChilds(childBlock)) { StoreTableNode copy = PlannerUtil.clone(context.plan.getLogicalPlan(), currentNode); copy.setChild(grandChildBlock.getPlan()); @@ -880,7 +884,8 @@ public class GlobalPlanner { */ private ExecutionBlock buildShuffleAndStorePlanToPartitionedTableWithUnion(GlobalPlanContext context, StoreTableNode currentNode, - ExecutionBlock lastBlock) { + ExecutionBlock lastBlock) + throws TajoException { MasterPlan masterPlan = context.plan; DataChannel lastChannel = null; @@ -904,7 +909,7 @@ public class GlobalPlanner { */ private ExecutionBlock buildShuffleAndStorePlanToPartitionedTable(GlobalPlanContext context, StoreTableNode currentNode, - ExecutionBlock lastBlock) { + ExecutionBlock lastBlock) throws TajoException { MasterPlan masterPlan = context.plan; ExecutionBlock nextBlock = masterPlan.newExecutionBlock(); @@ -925,7 +930,7 @@ public class GlobalPlanner { private ExecutionBlock buildNoPartitionedStorePlan(GlobalPlanContext context, StoreTableNode currentNode, - ExecutionBlock childBlock) { + ExecutionBlock childBlock) throws TajoException { if (hasUnionChild(currentNode)) { // when the below is union return buildShuffleAndStorePlanNoPartitionedTableWithUnion(context, currentNode, childBlock); } else { @@ -1245,7 +1250,8 @@ public class GlobalPlanner { return node; } - private LogicalNode handleUnaryNode(GlobalPlanContext context, LogicalNode child, LogicalNode node) { + private LogicalNode handleUnaryNode(GlobalPlanContext context, LogicalNode child, LogicalNode node) + throws TajoException { ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID()); execBlock.setPlan(node); context.execBlockMap.put(node.getPID(), execBlock); http://git-wip-us.apache.org/repos/asf/tajo/blob/3e083b16/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java index 8a7229b..f2edd71 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java @@ -21,10 +21,13 @@ */ package org.apache.tajo.engine.planner.global; +import com.google.common.base.Optional; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryId; +import org.apache.tajo.annotation.NotNull; import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.TajoException; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; import org.apache.tajo.plan.serder.PlanProto.ShuffleType; @@ -47,6 +50,45 @@ public class MasterPlan { private SimpleDirectedGraph<ExecutionBlockId, DataChannel> execBlockGraph = new SimpleDirectedGraph<ExecutionBlockId, DataChannel>(); + private Map<ExecutionBlockId, ShuffleContext> shuffleInfo = new HashMap<>(); + + /** + * + */ + public class ShuffleContext { + ExecutionBlockId parentEbId; + int partitionNum; + + public ShuffleContext(ExecutionBlockId parentEbId, int partitionNum) { + this.parentEbId = parentEbId; + this.partitionNum = partitionNum; + } + + public ExecutionBlockId getParentEbId() { + return parentEbId; + } + + public int getPartitionNum() { + return partitionNum; + } + } + + /** + * + * @param ebId + * @param partitionNum + */ + public void addShuffleInfo(ExecutionBlockId ebId, int partitionNum) { + ExecutionBlockId parentId = getParent(getExecBlock(ebId)).getId(); + shuffleInfo.put(parentId, new ShuffleContext(ebId, partitionNum)); + } + + public Optional<ShuffleContext> getShuffleInfo(ExecutionBlockId ebId) { + ExecutionBlockId parentId = getParent(getExecBlock(ebId)).getId(); + return shuffleInfo.containsKey(parentId) ? + Optional.of(shuffleInfo.get(parentId)) : Optional.<ShuffleContext>absent(); + } + public ExecutionBlockId newExecutionBlockId() { return new ExecutionBlockId(queryId, nextId.incrementAndGet()); } @@ -215,14 +257,13 @@ public class MasterPlan { return getChild(executionBlock.getId(), idx); } - public void accept(ExecutionBlockId v, DirectedGraphVisitor<ExecutionBlockId> visitor) { + public void accept(ExecutionBlockId v, DirectedGraphVisitor<ExecutionBlockId> visitor) throws TajoException { execBlockGraph.accept(v, visitor); } @Override public String toString() { StringBuilder sb = new StringBuilder(); - ExecutionBlockCursor cursor = new ExecutionBlockCursor(this); sb.append("-------------------------------------------------------------------------------\n"); sb.append("Execution Block Graph (TERMINAL - " + getTerminalBlock() + ")\n"); sb.append("-------------------------------------------------------------------------------\n"); http://git-wip-us.apache.org/repos/asf/tajo/blob/3e083b16/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java index 210ac2c..7cecc9d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java @@ -30,6 +30,7 @@ import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.planner.global.GlobalPlanner.GlobalPlanContext; import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.exception.TajoException; import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm; import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage; @@ -730,7 +731,7 @@ public class DistinctGroupbyBuilder { private ExecutionBlock buildDistinctGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock, DistinctGroupbyNode firstPhaseGroupBy, - DistinctGroupbyNode secondPhaseGroupBy) { + DistinctGroupbyNode secondPhaseGroupBy) throws TajoException { DataChannel lastDataChannel = null; // It pushes down the first phase group-by operator into all child blocks. http://git-wip-us.apache.org/repos/asf/tajo/blob/3e083b16/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java index b320a81..aff5038 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java @@ -172,7 +172,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { } @Override - public void visit(Stack<ExecutionBlockId> stack, ExecutionBlockId executionBlockId) { + public void visit(Stack<ExecutionBlockId> stack, ExecutionBlockId executionBlockId) throws TajoException { ExecutionBlock current = plan.getExecBlock(executionBlockId); if (plan.isLeaf(current)) { @@ -206,7 +206,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { * * @param current */ - private void visitNonLeafNode(ExecutionBlock current) { + private void visitNonLeafNode(ExecutionBlock current) throws TajoException { // At non-leaf execution blocks, merge broadcastable children's plan with the current plan. if (!plan.isTerminal(current)) { @@ -420,7 +420,8 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { * @param parent parent block who has join nodes * @return */ - private ExecutionBlock mergeTwoPhaseJoinIfPossible(MasterPlan plan, ExecutionBlock child, ExecutionBlock parent) { + private ExecutionBlock mergeTwoPhaseJoinIfPossible(MasterPlan plan, ExecutionBlock child, ExecutionBlock parent) + throws TajoException { ScanNode scanForChild = findScanForChildEb(child, parent); parentFinder.set(scanForChild); @@ -443,7 +444,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule { } private void addUnionNodeIfNecessary(Map<ExecutionBlockId, ExecutionBlockId> unionScanMap, MasterPlan plan, - ExecutionBlock child, ExecutionBlock current) { + ExecutionBlock child, ExecutionBlock current) throws TajoException { if (unionScanMap != null) { List<ExecutionBlockId> unionScans = TUtil.newList(); ExecutionBlockId representativeId = null; http://git-wip-us.apache.org/repos/asf/tajo/blob/3e083b16/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java index 69631b5..c744d95 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java @@ -85,7 +85,7 @@ public class ExternalSortExec extends SortExec { /** the defaultFanout of external sort */ private final int defaultFanout; /** It's the size of in-memory table. If memory consumption exceeds it, store the memory table into a disk. */ - private int sortBufferBytesNum; + private final long sortBufferBytesNum; /** the number of available cores */ private final int allocatedCoreNum; /** If there are available multiple cores, it tries parallel merge. */ http://git-wip-us.apache.org/repos/asf/tajo/blob/3e083b16/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java index 59c1cb4..9a63cd3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java @@ -606,10 +606,9 @@ public class Repartitioner { MasterPlan masterPlan, Stage stage, int maxNum) throws IOException { DataChannel channel = masterPlan.getIncomingChannels(stage.getBlock().getId()).get(0); - if (channel.getShuffleType() == HASH_SHUFFLE - || channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) { + if (channel.isHashShuffle()) { scheduleHashShuffledFetches(schedulerContext, masterPlan, stage, channel, maxNum); - } else if (channel.getShuffleType() == RANGE_SHUFFLE) { + } else if (channel.isRangeShuffle()) { scheduleRangeShuffledFetches(schedulerContext, masterPlan, stage, channel, maxNum); } else { throw new TajoInternalError("Cannot support partition type"); @@ -1275,12 +1274,12 @@ public class Repartitioner { } // set the partition number for group by and sort - if (channel.getShuffleType() == HASH_SHUFFLE) { + if (channel.isHashShuffle()) { if (execBlock.getPlan().getType() == NodeType.GROUP_BY || execBlock.getPlan().getType() == NodeType.DISTINCT_GROUP_BY) { keys = channel.getShuffleKeys(); } - } else if (channel.getShuffleType() == RANGE_SHUFFLE) { + } else if (channel.isRangeShuffle()) { if (execBlock.getPlan().getType() == NodeType.SORT) { SortNode sort = (SortNode) execBlock.getPlan(); keys = new Column[sort.getSortKeys().length]; http://git-wip-us.apache.org/repos/asf/tajo/blob/3e083b16/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 04ff115..d916b61 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -18,7 +18,7 @@ package org.apache.tajo.querymaster; -import com.google.common.base.Preconditions; +import com.google.common.base.*; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -42,14 +42,17 @@ import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.engine.planner.global.MasterPlan.ShuffleContext; import org.apache.tajo.error.Errors.SerializedException; import org.apache.tajo.exception.ErrorUtil; import org.apache.tajo.exception.TajoException; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.TaskState; import org.apache.tajo.master.event.*; import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage; import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; import org.apache.tajo.plan.util.PlannerUtil; @@ -855,6 +858,7 @@ public class Stage implements EventHandler<StageEvent> { ExecutionBlock parent = stage.getMasterPlan().getParent(stage.getBlock()); DataChannel channel = stage.getMasterPlan().getChannel(stage.getId(), parent.getId()); setShuffleIfNecessary(stage, channel); + // TODO: verify changed shuffle plan initTaskScheduler(stage); // execute pre-processing asyncronously stage.getContext().getQueryMasterContext().getSingleEventExecutor() @@ -920,8 +924,8 @@ public class Stage implements EventHandler<StageEvent> { * methods and the number of partitions to a given Stage. */ private static void setShuffleIfNecessary(Stage stage, DataChannel channel) { - if (channel.getShuffleType() != ShuffleType.NONE_SHUFFLE) { - int numTasks = calculateShuffleOutputNum(stage, channel); + if (channel.isHashShuffle()) { + int numTasks = calculateShuffleOutputNum(stage); Repartitioner.setShuffleOutputNumForTwoPhase(stage, numTasks, channel); } } @@ -933,122 +937,154 @@ public class Stage implements EventHandler<StageEvent> { * @param stage * @return */ - public static int calculateShuffleOutputNum(Stage stage, DataChannel channel) { + public static int calculateShuffleOutputNum(Stage stage) { MasterPlan masterPlan = stage.getMasterPlan(); - ExecutionBlock parent = masterPlan.getParent(stage.getBlock()); - LogicalNode grpNode = null; - if (parent != null) { - grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.GROUP_BY); - if (grpNode == null) { - grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.DISTINCT_GROUP_BY); + // For test + if (masterPlan.getContext().containsKey(SessionVars.TEST_MIN_TASK_NUM)) { + int partitionNum = masterPlan.getContext().getInt(SessionVars.TEST_MIN_TASK_NUM); + LOG.info(stage.getId() + ", The determined number of partitions is " + partitionNum + " for test"); + return partitionNum; + } + + com.google.common.base.Optional<ShuffleContext> optional = masterPlan.getShuffleInfo(stage.getId()); + if (optional.isPresent()) { + LOG.info("# of partitions is determined as " + optional.get().getPartitionNum() + + "to match with sibling eb's partition number"); + return optional.get().getPartitionNum(); + + } else { + ExecutionBlock parent = masterPlan.getParent(stage.getBlock()); + int partitionNum; + + if (parent != null) { + // We assume this execution block the first stage of join if two or more tables are included in this block, + if (parent.hasJoin()) { + if (parent.getNonBroadcastRelNum() > 1) { + // repartition join + partitionNum = calculatePartitionNumForRepartitionJoin(parent, stage); + LOG.info(stage.getId() + ", The determined number of partitions for repartition join is " + partitionNum); + } else { + // broadcast join + // partition number is calculated using the volume of the large table + partitionNum = calculatePartitionNumDefault(parent, stage); + LOG.info(stage.getId() + ", The determined number of partitions for broadcast join is " + partitionNum); + } + + } else { + // Is this stage the first step of group-by? + if (parent.hasAgg()) { + LogicalNode grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.GROUP_BY, + NodeType.DISTINCT_GROUP_BY, NodeType.WINDOW_AGG); + if (grpNode == null) { + throw new TajoInternalError("Cannot find aggregation plan for " + stage.getId()); + } + + if (!hasGroupKeys(stage, grpNode)) { + LOG.info(stage.getId() + ", No Grouping Column - determinedTaskNum is set to 1"); + partitionNum = 1; + } else { + partitionNum = calculatePartitionNumForAgg(parent, stage); + LOG.info(stage.getId() + ", The determined number of aggregation partitions is " + partitionNum); + } + + } else { + // NOTE: the below code might be executed during sort, but the partition number is not used anymore for sort. + LOG.info("============>>>>> Unexpected Case! <<<<<================"); + partitionNum = calculatePartitionNumDefault(parent, stage); + LOG.info(stage.getId() + ", The determined number of partitions is " + partitionNum); + } + + } + } else { + // This case means that the parent eb does not exist even though data shuffle is required after the current eb. + throw new TajoInternalError("Cannot find parent execution block of " + stage.block.getId()); } + + // Record the partition number for sibling execution blocks + masterPlan.addShuffleInfo(stage.getId(), partitionNum); + return partitionNum; } + } + + private static int calculatePartitionNumForRepartitionJoin(ExecutionBlock parent, Stage currentStage) { + List<ExecutionBlock> childs = currentStage.masterPlan.getChilds(parent); - // We assume this execution block the first stage of join if two or more tables are included in this block, - if (parent != null && (parent.getNonBroadcastRelNum()) >= 2) { - List<ExecutionBlock> childs = masterPlan.getChilds(parent); + // for outer + ExecutionBlock outer = childs.get(0); + long outerVolume = getInputVolume(currentStage.masterPlan, currentStage.context, outer); - // for outer - ExecutionBlock outer = childs.get(0); - long outerVolume = getInputVolume(stage.masterPlan, stage.context, outer); + // for inner + ExecutionBlock inner = childs.get(1); + long innerVolume = getInputVolume(currentStage.masterPlan, currentStage.context, inner); + LOG.info(currentStage.getId() + ", Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, " + + "Inner volume: " + Math.ceil((double) innerVolume / 1048576) + "MB"); - // for inner - ExecutionBlock inner = childs.get(1); - long innerVolume = getInputVolume(stage.masterPlan, stage.context, inner); - LOG.info(stage.getId() + ", Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, " - + "Inner volume: " + Math.ceil((double) innerVolume / 1048576) + "MB"); + long bigger = Math.max(outerVolume, innerVolume); - long bigger = Math.max(outerVolume, innerVolume); + int mb = (int) Math.ceil((double) bigger / 1048576); + LOG.info(currentStage.getId() + ", Bigger Table's volume is approximately " + mb + " MB"); - int mb = (int) Math.ceil((double) bigger / 1048576); - LOG.info(stage.getId() + ", Bigger Table's volume is approximately " + mb + " MB"); + return (int) Math.ceil((double) mb / + currentStage.masterPlan.getContext().getInt(SessionVars.JOIN_PER_SHUFFLE_SIZE)); + } - int taskNum = (int) Math.ceil((double) mb / masterPlan.getContext().getInt(SessionVars.JOIN_PER_SHUFFLE_SIZE)); + private static int calculatePartitionNumForAgg(ExecutionBlock parent, Stage stage) { + int volumeByMB = getInputVolumeMB(parent, stage); + LOG.info(stage.getId() + ", Table's volume is approximately " + volumeByMB + " MB"); + // determine the number of task + return (int) Math.ceil((double) volumeByMB / + stage.masterPlan.getContext().getInt(SessionVars.GROUPBY_PER_SHUFFLE_SIZE)); - if (masterPlan.getContext().containsKey(SessionVars.TEST_MIN_TASK_NUM)) { - taskNum = masterPlan.getContext().getInt(SessionVars.TEST_MIN_TASK_NUM); - LOG.warn("!!!!! TESTCASE MODE !!!!!"); - } + } - // The shuffle output numbers of join may be inconsistent by execution block order. - // Thus, we need to compare the number with DataChannel output numbers. - // If the number is right, the number and DataChannel output numbers will be consistent. - int outerShuffleOutputNum = 0, innerShuffleOutputNum = 0; - for (DataChannel eachChannel : masterPlan.getOutgoingChannels(outer.getId())) { - outerShuffleOutputNum = Math.max(outerShuffleOutputNum, eachChannel.getShuffleOutputNum()); - } - for (DataChannel eachChannel : masterPlan.getOutgoingChannels(inner.getId())) { - innerShuffleOutputNum = Math.max(innerShuffleOutputNum, eachChannel.getShuffleOutputNum()); + private static boolean hasGroupKeys(Stage currentStage, LogicalNode aggNode) { + if (aggNode.getType() == NodeType.GROUP_BY) { + return ((GroupbyNode)aggNode).getGroupingColumns().length > 0; + } else if (aggNode.getType() == NodeType.DISTINCT_GROUP_BY) { + // Find current distinct stage node. + DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(currentStage.getBlock().getPlan(), + NodeType.DISTINCT_GROUP_BY); + if (distinctNode == null) { + LOG.warn(currentStage.getId() + ", Can't find current DistinctGroupbyNode"); + distinctNode = (DistinctGroupbyNode)aggNode; } - if (outerShuffleOutputNum != innerShuffleOutputNum - && taskNum != outerShuffleOutputNum - && taskNum != innerShuffleOutputNum) { - LOG.info(stage.getId() + ", Change determined number of join partitions cause difference of outputNum" + - ", originTaskNum=" + taskNum + ", changedTaskNum=" + Math.max(outerShuffleOutputNum, innerShuffleOutputNum) + - ", outerShuffleOutptNum=" + outerShuffleOutputNum + - ", innerShuffleOutputNum=" + innerShuffleOutputNum); - taskNum = Math.max(outerShuffleOutputNum, innerShuffleOutputNum); - } - - LOG.info(stage.getId() + ", The determined number of join partitions is " + taskNum); - - return taskNum; - // Is this stage the first step of group-by? - } else if (grpNode != null) { - boolean hasGroupColumns = true; - if (grpNode.getType() == NodeType.GROUP_BY) { - hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0; - } else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) { - // Find current distinct stage node. - DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(stage.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY); - if (distinctNode == null) { - LOG.warn(stage.getId() + ", Can't find current DistinctGroupbyNode"); - distinctNode = (DistinctGroupbyNode)grpNode; - } - hasGroupColumns = distinctNode.getGroupingColumns().length > 0; + boolean hasGroupColumns = distinctNode.getGroupingColumns().length > 0; - Enforcer enforcer = stage.getBlock().getEnforcer(); - if (enforcer == null) { - LOG.warn(stage.getId() + ", DistinctGroupbyNode's enforcer is null."); - } + Enforcer enforcer = currentStage.getBlock().getEnforcer(); + if (enforcer == null) { + LOG.warn(currentStage.getId() + ", DistinctGroupbyNode's enforcer is null."); + } else { EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode); if (property != null) { if (property.getDistinct().getIsMultipleAggregation()) { MultipleAggregationStage multiAggStage = property.getDistinct().getMultipleAggregationStage(); - if (multiAggStage != MultipleAggregationStage.THRID_STAGE) { - hasGroupColumns = true; - } + hasGroupColumns = multiAggStage != MultipleAggregationStage.THRID_STAGE; } } } - if (!hasGroupColumns) { - LOG.info(stage.getId() + ", No Grouping Column - determinedTaskNum is set to 1"); - return 1; - } else { - long volume = getInputVolume(stage.masterPlan, stage.context, stage.block); - - int volumeByMB = (int) Math.ceil((double) volume / StorageUnit.MB); - LOG.info(stage.getId() + ", Table's volume is approximately " + volumeByMB + " MB"); - // determine the number of task - int taskNum = (int) Math.ceil((double) volumeByMB / - masterPlan.getContext().getInt(SessionVars.GROUPBY_PER_SHUFFLE_SIZE)); - LOG.info(stage.getId() + ", The determined number of aggregation partitions is " + taskNum); - return taskNum; - } + return hasGroupColumns; } else { - LOG.info("============>>>>> Unexpected Case! <<<<<================"); - long volume = getInputVolume(stage.masterPlan, stage.context, stage.block); - - int mb = (int) Math.ceil((double)volume / 1048576); - LOG.info(stage.getId() + ", Table's volume is approximately " + mb + " MB"); - // determine the number of task per 128MB - int taskNum = (int) Math.ceil((double)mb / 128); - LOG.info(stage.getId() + ", The determined number of partitions is " + taskNum); - return taskNum; + return ((WindowAggNode) aggNode).hasPartitionKeys(); } } + private static int calculatePartitionNumDefault(ExecutionBlock parent, Stage currentStage) { + int mb = getInputVolumeMB(parent, currentStage); + LOG.info(currentStage.getId() + ", Table's volume is approximately " + mb + " MB"); + // determine the number of task per 128 MB + return (int) Math.ceil((double)mb / 128); + } + + private static int getInputVolumeMB(ExecutionBlock parent, Stage currentStage) { + // NOTE: Get input volume from the parent EB. + // If the parent EB contains an UNION query, the volume of the whole input for the UNION is returned. + // Otherwise, only the input volume of the current EB is returned. + long volume = getInputVolume(currentStage.masterPlan, currentStage.context, parent); + + return (int) Math.ceil((double)volume / StorageUnit.MB); + } + private static void schedule(Stage stage) throws IOException, TajoException { MasterPlan masterPlan = stage.getMasterPlan(); ExecutionBlock execBlock = stage.getBlock(); http://git-wip-us.apache.org/repos/asf/tajo/blob/3e083b16/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java index bd61342..febfaa8 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java @@ -398,7 +398,7 @@ public class SelfDescSchemaBuildPhase extends LogicalPlanPreprocessPhase { * @param columns a set of columns * @return schema build from columns */ - private Schema buildSchemaFromColumnSet(Set<Column> columns) { + private Schema buildSchemaFromColumnSet(Set<Column> columns) throws TajoException { SchemaGraph schemaGraph = new SchemaGraph(); Set<ColumnVertex> rootVertexes = new HashSet<>(); Schema schema = new Schema(); http://git-wip-us.apache.org/repos/asf/tajo/blob/3e083b16/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java index 44dfc9f..cb3f9b8 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java @@ -415,7 +415,7 @@ public class PlannerUtil { * @param type to find * @return a found logical node */ - public static <T extends LogicalNode> T findMostBottomNode(LogicalNode node, NodeType type) { + public static <T extends LogicalNode> T findMostBottomNode(LogicalNode node, NodeType... type) { Preconditions.checkNotNull(node); Preconditions.checkNotNull(type);
