Repository: tajo Updated Branches: refs/heads/master f124b87e6 -> c2725a779
TAJO-1130: Concurrent execution of independent execution blocks. Closes #423 Signed-off-by: Jihoon Son <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/c2725a77 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/c2725a77 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/c2725a77 Branch: refs/heads/master Commit: c2725a779c9f80d60a906657aaa95aff1a48edd0 Parents: f124b87 Author: navis.ryu <[email protected]> Authored: Thu May 28 14:48:41 2015 +0900 Committer: Jihoon Son <[email protected]> Committed: Thu May 28 14:49:57 2015 +0900 ---------------------------------------------------------------------- CHANGES | 7 +- .../main/java/org/apache/tajo/SessionVars.java | 3 + .../java/org/apache/tajo/conf/TajoConf.java | 5 +- .../tajo/engine/planner/global/DataChannel.java | 4 + .../planner/global/ExecutionBlockCursor.java | 101 ++++++++------- .../engine/planner/global/ExecutionQueue.java | 43 +++++++ .../tajo/engine/planner/global/MasterPlan.java | 6 +- .../planner/global/ParallelExecutionQueue.java | 126 +++++++++++++++++++ .../rules/GlobalPlanEqualityTester.java | 3 +- .../ExplainGlobalPlanPreprocessorForTest.java | 3 +- .../NonForwardQueryResultSystemScanner.java | 3 +- .../apache/tajo/master/rm/WorkerResource.java | 16 +-- .../java/org/apache/tajo/querymaster/Query.java | 89 +++++++------ .../org/apache/tajo/TajoTestingCluster.java | 9 +- .../tajo/master/TestExecutionBlockCursor.java | 4 +- .../TestTajoCli/testHelpSessionVars.result | 1 + 16 files changed, 313 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 0d4ae04..a2af206 100644 --- a/CHANGES +++ b/CHANGES @@ -27,15 +27,18 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1130: Concurrent execution of independent execution blocks. + (Contributed by navis, Committed by jihoon) + TAJO-1618: [Rest API] queries/{queryId} should set default print type. - (Contributed by DaeMyoung Kang, Committed by jihoon) + (Contributed by DaeMyung Kang, Committed by jihoon) TAJO-1553: Improve broadcast join planning. (jihoon) TAJO-1577: Add test cases to verify join plans. (jihoon) TAJO-1607: Tajo Rest Cache-Id should be bigger than zero. (Contributed by - DaeMyoung Kang, Committed by hyunsik) + DaeMyung Kang, Committed by hyunsik) TAJO-1603: Refactor StorageManager. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-common/src/main/java/org/apache/tajo/SessionVars.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java index 0d2319e..031387c 100644 --- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java +++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java @@ -107,6 +107,9 @@ public enum SessionVars implements ConfigKey { GROUPBY_MULTI_LEVEL_ENABLED(ConfVars.$GROUPBY_MULTI_LEVEL_ENABLED, "Multiple level groupby enabled", DEFAULT, Boolean.class, Validators.bool()), + QUERY_EXECUTE_PARALLEL(ConfVars.$QUERY_EXECUTE_PARALLEL_MAX, "Maximum parallel running of execution blocks for a query", + DEFAULT, Integer.class, Validators.min("0")), + // for physical Executors EXTSORT_BUFFER_SIZE(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE, "sort buffer size for external sort (mb)", DEFAULT, Long.class, Validators.min("0")), http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index e20658b..3f350c3 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -305,7 +305,7 @@ public class TajoConf extends Configuration { ///////////////////////////////////////////////////////////////////////////////// // User Session Configuration // - // All session variables begin with dollor($) sign. They are default configs + // All session variables begin with dollar($) sign. They are default configs // for session variables. Do not directly use the following configs. Instead, // please use QueryContext in order to access session variables. // @@ -330,6 +330,9 @@ public class TajoConf extends Configuration { $GROUPBY_MULTI_LEVEL_ENABLED("tajo.dist-query.groupby.multi-level-aggr", true), + // WARN "tajo.yarn-rm.parallel-task-runner-launcher-num" should be set enough to avoid deadlock + $QUERY_EXECUTE_PARALLEL_MAX("tajo.query.execute.parallel.max", 1), + // for physical Executors $EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200L), $EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-bytes", http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java index dc6cd4c..3adc0a3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java @@ -99,6 +99,10 @@ public class DataChannel { return shuffleType; } + public boolean needShuffle() { + return shuffleType != ShuffleType.NONE_SHUFFLE; + } + public TransmitType getTransmitType() { return this.transmitType; } http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java index 9f82672..c6864b9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java @@ -15,6 +15,7 @@ package org.apache.tajo.engine.planner.global; import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.SessionVars; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; @@ -23,10 +24,9 @@ import java.util.concurrent.atomic.AtomicInteger; * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks. * This class is a pointer to an ExecutionBlock that the query engine should execute. */ -public class ExecutionBlockCursor { +public class ExecutionBlockCursor implements Iterable<ExecutionBlock> { private MasterPlan masterPlan; private ArrayList<ExecutionBlock> orderedBlocks = new ArrayList<ExecutionBlock>(); - private int cursor = 0; private List<BuildOrderItem> executionOrderedBlocks = new ArrayList<BuildOrderItem>(); private List<BuildOrderItem> notOrderedSiblingBlocks = new ArrayList<BuildOrderItem>(); @@ -45,29 +45,71 @@ public class ExecutionBlockCursor { } } + @Override + public Iterator<ExecutionBlock> iterator() { + return orderedBlocks.iterator(); + } + public int size() { return orderedBlocks.size(); } + public ExecutionQueue newCursor() { + int parallel = masterPlan.getContext().getInt(SessionVars.QUERY_EXECUTE_PARALLEL); + if (parallel > 1) { + return new ParallelExecutionQueue(masterPlan, parallel); + } + return new SimpleExecutionQueue(); + } + + public class SimpleExecutionQueue implements ExecutionQueue { + + private final Iterator<ExecutionBlock> iterator = iterator(); + private ExecutionBlock last; + + @Override + public int size() { + return ExecutionBlockCursor.this.size(); + } + + @Override + public ExecutionBlock[] first() { + return iterator.hasNext() ? next(null) : null; + } + + @Override + public ExecutionBlock[] next(ExecutionBlockId blockId) { + return iterator.hasNext() ? new ExecutionBlock[]{last = iterator.next()} : null; + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + for (ExecutionBlock block : ExecutionBlockCursor.this) { + if (sb.length() > 0) { + sb.append(','); + } + if (block == last) { + sb.append('('); + } + sb.append(block.getId().getId()); + if (block == last) { + sb.append(')'); + } + } + return sb.toString(); + } + } + // Add all execution blocks in a depth first and postfix order private void buildDepthFirstOrder(ExecutionBlock current) { - Stack<ExecutionBlock> stack = new Stack<ExecutionBlock>(); if (!masterPlan.isLeaf(current.getId())) { for (ExecutionBlock execBlock : masterPlan.getChilds(current)) { - if (!masterPlan.isLeaf(execBlock)) { - buildDepthFirstOrder(execBlock); - } else { - stack.push(execBlock); - } - } - for (ExecutionBlock execBlock : stack) { buildDepthFirstOrder(execBlock); } } orderedBlocks.add(current); } - private void buildSiblingFirstOrder(ExecutionBlock current) { /* |-eb_1404887024677_0004_000007 @@ -178,41 +220,4 @@ public class ExecutionBlockCursor { return result; } } - - public boolean hasNext() { - return cursor < orderedBlocks.size(); - } - - public ExecutionBlock nextBlock() { - return orderedBlocks.get(cursor++); - } - - public ExecutionBlock peek() { - return orderedBlocks.get(cursor); - } - - public ExecutionBlock peek(int skip) { - return orderedBlocks.get(cursor + skip); - } - - public void reset() { - cursor = 0; - } - - public String toString() { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < orderedBlocks.size(); i++) { - if (i == (cursor == 0 ? 0 : cursor - 1)) { - sb.append("(").append(orderedBlocks.get(i).getId().getId()).append(")"); - } else { - sb.append(orderedBlocks.get(i).getId().getId()); - } - - if (i < orderedBlocks.size() - 1) { - sb.append(","); - } - } - - return sb.toString(); - } } http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionQueue.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionQueue.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionQueue.java new file mode 100644 index 0000000..fa7ca15 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionQueue.java @@ -0,0 +1,43 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.global; + +import org.apache.tajo.ExecutionBlockId; + +// Retrieves execution blocks to run real works +public interface ExecutionQueue { + + /** + * remaining blocks in queue + * + * @return number of blocks + */ + int size(); + + /** + * return initial blocks to be run + * + * @return blocks to be run + */ + ExecutionBlock[] first(); + + /** + * get next execution blocks to be run + * + * @param blockId currently finished id of execution block + * @return null for finished, can return empty array + */ + ExecutionBlock[] next(ExecutionBlockId blockId); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java index 6e9b74f..22c3751 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java @@ -237,15 +237,13 @@ public class MasterPlan { sb.append("Order of Execution\n"); sb.append("-------------------------------------------------------------------------------"); int order = 1; - while (executionOrderCursor.hasNext()) { - ExecutionBlock currentEB = executionOrderCursor.nextBlock(); + for (ExecutionBlock currentEB : cursor) { sb.append("\n").append(order).append(": ").append(currentEB.getId()); order++; } sb.append("\n-------------------------------------------------------------------------------\n"); - while(cursor.hasNext()) { - ExecutionBlock block = cursor.nextBlock(); + for (ExecutionBlock block : cursor) { boolean terminal = false; sb.append("\n"); http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ParallelExecutionQueue.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ParallelExecutionQueue.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ParallelExecutionQueue.java new file mode 100644 index 0000000..1e823be --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ParallelExecutionQueue.java @@ -0,0 +1,126 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.global; + +import com.google.common.collect.Iterables; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.ExecutionBlockId; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.Stack; + +public class ParallelExecutionQueue implements ExecutionQueue, Iterable<ExecutionBlock> { + + private static final Log LOG = LogFactory.getLog(ParallelExecutionQueue.class); + + private final int maximum; + private final MasterPlan masterPlan; + private final List<Deque<ExecutionBlock>> executable; + private final Set<ExecutionBlockId> executed = new HashSet<ExecutionBlockId>(); + + public ParallelExecutionQueue(MasterPlan masterPlan, int maximum) { + this.masterPlan = masterPlan; + this.maximum = maximum; + this.executable = toStacks(masterPlan.getRoot()); + } + + private List<Deque<ExecutionBlock>> toStacks(ExecutionBlock root) { + List<Deque<ExecutionBlock>> stacks = new ArrayList<Deque<ExecutionBlock>>(); + toStacks(root, stacks, new ArrayList<ExecutionBlock>()); + return stacks; + } + + // currently, diamond shaped DAG is not supported in tajo + private void toStacks(ExecutionBlock current, List<Deque<ExecutionBlock>> queues, + List<ExecutionBlock> stack) { + stack.add(current); + if (masterPlan.isLeaf(current.getId())) { + queues.add(new ArrayDeque<ExecutionBlock>(stack)); + } else { + List<ExecutionBlock> children = masterPlan.getChilds(current); + for (int i = 0; i < children.size(); i++) { + toStacks(children.get(i), queues, i == 0 ? stack : new Stack<ExecutionBlock>()); + } + } + } + + @Override + public synchronized int size() { + int size = 0; + for (Deque<ExecutionBlock> queue : executable) { + size += queue.size(); + } + return size; + } + + @Override + public synchronized ExecutionBlock[] first() { + int max = Math.min(maximum, executable.size()); + List<ExecutionBlock> result = new ArrayList<ExecutionBlock>(); + for (Deque<ExecutionBlock> queue : executable) { + if (result.size() < max && isExecutableNow(queue.peekLast())) { + result.add(queue.removeLast()); + } + } + LOG.info("Initial executable blocks " + result); + return result.toArray(new ExecutionBlock[result.size()]); + } + + @Override + public synchronized ExecutionBlock[] next(ExecutionBlockId doneNow) { + executed.add(doneNow); + + int remaining = 0; + for (Deque<ExecutionBlock> queue : executable) { + if (!queue.isEmpty() && isExecutableNow(queue.peekLast())) { + LOG.info("Next executable block " + queue.peekLast()); + return new ExecutionBlock[]{queue.removeLast()}; + } + remaining += queue.size(); + } + return remaining > 0 ? new ExecutionBlock[0] : null; + } + + private boolean isExecutableNow(ExecutionBlock current) { + ExecutionBlock parent = masterPlan.getParent(current); + + List<ExecutionBlock> dependents = masterPlan.getChilds(current); + if (parent != null && masterPlan.getChannel(current.getId(), parent.getId()).needShuffle()) { + // add all children of sibling for partitioning + dependents = new ArrayList<ExecutionBlock>(); + for (ExecutionBlock sibling : masterPlan.getChilds(parent)) { + dependents.addAll(masterPlan.getChilds(sibling)); + } + } + for (ExecutionBlock child : dependents) { + if (!executed.contains(child.getId())) { + return false; // there's something should be done before this + } + } + return true; + } + + @Override + public Iterator<ExecutionBlock> iterator() { + return Iterables.concat(executable).iterator(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java index 9148382..9f27eed 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java @@ -47,8 +47,7 @@ public class GlobalPlanEqualityTester implements GlobalPlanRewriteRule { public MasterPlan rewrite(MasterPlan plan) { try { ExecutionBlockCursor cursor = new ExecutionBlockCursor(plan); - while (cursor.hasNext()) { - ExecutionBlock eb = cursor.nextBlock(); + for (ExecutionBlock eb : cursor) { LogicalNode node = eb.getPlan(); if (node != null) { PlanProto.LogicalNodeTree tree = LogicalNodeSerializer.serialize(node); http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java index c26e12c..78cd015 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/ExplainGlobalPlanPreprocessorForTest.java @@ -44,8 +44,7 @@ public class ExplainGlobalPlanPreprocessorForTest { public void prepareTest(MasterPlan plan) { ExecutionBlockCursor cursor = new ExecutionBlockCursor(plan); - while (cursor.hasNext()) { - ExecutionBlock block = cursor.nextBlock(); + for (ExecutionBlock block : cursor) { List<DataChannel> outgoingChannels = plan.getOutgoingChannels(block.getId()); if (outgoingChannels != null) { for (DataChannel channel : outgoingChannels) { http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java index 45b23f8..562dbc3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java @@ -105,8 +105,7 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult ExecutionBlockCursor cursor = new ExecutionBlockCursor(masterPlan); ExecutionBlock leafBlock = null; - while (cursor.hasNext()) { - ExecutionBlock block = cursor.nextBlock(); + for (ExecutionBlock block : cursor) { if (masterPlan.isLeaf(block)) { leafBlock = block; break; http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java index 5f2d33c..c2c0f88 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java @@ -76,8 +76,8 @@ public class WorkerResource { } public int getMemoryMB() { + rlock.lock(); try { - rlock.lock(); return memoryMB; } finally { rlock.unlock(); @@ -85,8 +85,8 @@ public class WorkerResource { } public void setMemoryMB(int memoryMB) { + wlock.lock(); try { - wlock.lock(); this.memoryMB = memoryMB; } finally { wlock.unlock(); @@ -112,8 +112,8 @@ public class WorkerResource { } public int getUsedMemoryMB() { + rlock.lock(); try { - rlock.lock(); return usedMemoryMB; } finally { rlock.unlock(); @@ -121,8 +121,8 @@ public class WorkerResource { } public void setUsedMemoryMB(int usedMemoryMB) { + wlock.lock(); try { - wlock.lock(); this.usedMemoryMB = usedMemoryMB; } finally { wlock.unlock(); @@ -142,9 +142,10 @@ public class WorkerResource { } public void releaseResource(float diskSlots, int memoryMB) { + LOG.info("Disk " + diskSlots + " slot(s), Memory " + memoryMB + " MB"); + wlock.lock(); try { - wlock.lock(); - usedMemoryMB = usedMemoryMB - memoryMB; + usedMemoryMB -= memoryMB; usedDiskSlots -= diskSlots; if(usedMemoryMB < 0) { LOG.warn("Used memory can't be a minus: " + usedMemoryMB); @@ -160,8 +161,9 @@ public class WorkerResource { } public void allocateResource(float diskSlots, int memoryMB) { + LOG.info("Disk " + diskSlots + " slot(s), Memory " + memoryMB + " MB"); + wlock.lock(); try { - wlock.lock(); usedMemoryMB += memoryMB; usedDiskSlots += diskSlots; http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index 362dfa6..23808b5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -40,6 +40,7 @@ import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.ExecutionBlockCursor; +import org.apache.tajo.engine.planner.global.ExecutionQueue; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.plan.logical.*; import org.apache.tajo.engine.query.QueryContext; @@ -63,12 +64,13 @@ public class Query implements EventHandler<QueryEvent> { // Facilities for Query private final TajoConf systemConf; private final Clock clock; - private String queryStr; - private Map<ExecutionBlockId, Stage> stages; + private final String queryStr; + private final Map<ExecutionBlockId, Stage> stages; private final EventHandler eventHandler; private final MasterPlan plan; QueryMasterTask.QueryMasterTaskContext context; private ExecutionBlockCursor cursor; + private ExecutionQueue execution; // Query Status private final QueryId id; @@ -77,7 +79,7 @@ public class Query implements EventHandler<QueryEvent> { private long finishTime; private TableDesc resultDesc; private int completedStagesCount = 0; - private int successedStagesCount = 0; + private int succeededStagesCount = 0; private int killedStagesCount = 0; private int failedStagesCount = 0; private int erroredStagesCount = 0; @@ -93,7 +95,7 @@ public class Query implements EventHandler<QueryEvent> { private QueryState queryState; // Transition Handler - private static final SingleArcTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition(); + private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition(); private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition(); private static final StageCompletedTransition STAGE_COMPLETED_TRANSITION = new StageCompletedTransition(); private static final QueryCompletedTransition QUERY_COMPLETED_TRANSITION = new QueryCompletedTransition(); @@ -213,14 +215,12 @@ public class Query implements EventHandler<QueryEvent> { StringBuilder sb = new StringBuilder("\n======================================================="); sb.append("\nThe order of execution: \n"); int order = 1; - while (cursor.hasNext()) { - ExecutionBlock currentEB = cursor.nextBlock(); + for (ExecutionBlock currentEB : cursor) { sb.append("\n").append(order).append(": ").append(currentEB.getId()); order++; } sb.append("\n======================================================="); LOG.info(sb); - cursor.reset(); ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); this.readLock = readWriteLock.readLock(); @@ -381,6 +381,14 @@ public class Query implements EventHandler<QueryEvent> { return cursor; } + public ExecutionQueue newExecutionQueue() { + return execution = cursor.newCursor(); + } + + public ExecutionQueue getExecutionQueue() { + return execution; + } + public static class StartTransition implements SingleArcTransition<Query, QueryEvent> { @@ -388,12 +396,15 @@ public class Query implements EventHandler<QueryEvent> { public void transition(Query query, QueryEvent queryEvent) { query.setStartTime(); - Stage stage = new Stage(query.context, query.getPlan(), - query.getExecutionBlockCursor().nextBlock()); - stage.setPriority(query.priority--); - query.addStage(stage); - stage.getEventHandler().handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT)); - LOG.debug("Schedule unit plan: \n" + stage.getBlock().getPlan()); + ExecutionQueue executionQueue = query.newExecutionQueue(); + for (ExecutionBlock executionBlock : executionQueue.first()) { + Stage stage = new Stage(query.context, query.getPlan(), executionBlock); + stage.setPriority(query.priority--); + query.addStage(stage); + + stage.getEventHandler().handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT)); + LOG.debug("Schedule unit plan: \n" + stage.getBlock().getPlan()); + } } } @@ -616,25 +627,31 @@ public class Query implements EventHandler<QueryEvent> { public static class StageCompletedTransition implements SingleArcTransition<Query, QueryEvent> { - private boolean hasNext(Query query) { - ExecutionBlockCursor cursor = query.getExecutionBlockCursor(); - ExecutionBlock nextBlock = cursor.peek(); - return !query.getPlan().isTerminal(nextBlock); - } - - private void executeNextBlock(Query query) { - ExecutionBlockCursor cursor = query.getExecutionBlockCursor(); - ExecutionBlock nextBlock = cursor.nextBlock(); - Stage nextStage = new Stage(query.context, query.getPlan(), nextBlock); - nextStage.setPriority(query.priority--); - query.addStage(nextStage); - nextStage.getEventHandler().handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT)); - - LOG.info("Scheduling Stage:" + nextStage.getId()); - if(LOG.isDebugEnabled()) { - LOG.debug("Scheduling Stage's Priority: " + nextStage.getPriority()); - LOG.debug("Scheduling Stage's Plan: \n" + nextStage.getBlock().getPlan()); + // return true for terminal + private synchronized boolean executeNextBlock(Query query, ExecutionBlockId blockId) { + ExecutionQueue cursor = query.getExecutionQueue(); + ExecutionBlock[] nextBlocks = cursor.next(blockId); + if (nextBlocks == null || nextBlocks.length == 0) { + return nextBlocks == null; + } + boolean terminal = true; + for (ExecutionBlock nextBlock : nextBlocks) { + if (query.getPlan().isTerminal(nextBlock)) { + continue; + } + Stage nextStage = new Stage(query.context, query.getPlan(), nextBlock); + nextStage.setPriority(query.priority--); + query.addStage(nextStage); + nextStage.getEventHandler().handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT)); + + LOG.info("Scheduling Stage:" + nextStage.getId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Scheduling Stage's Priority: " + nextStage.getPriority()); + LOG.debug("Scheduling Stage's Plan: \n" + nextStage.getBlock().getPlan()); + } + terminal = false; } + return terminal; } @Override @@ -647,7 +664,7 @@ public class Query implements EventHandler<QueryEvent> { StageCompletedEvent castEvent = (StageCompletedEvent) event; if (castEvent.getState() == StageState.SUCCEEDED) { - query.successedStagesCount++; + query.succeededStagesCount++; } else if (castEvent.getState() == StageState.KILLED) { query.killedStagesCount++; } else if (castEvent.getState() == StageState.FAILED) { @@ -663,11 +680,11 @@ public class Query implements EventHandler<QueryEvent> { // if a stage is succeeded and a query is running if (castEvent.getState() == StageState.SUCCEEDED && // latest stage succeeded query.getSynchronizedState() == QueryState.QUERY_RUNNING && // current state is not in KILL_WAIT, FAILED, or ERROR. - hasNext(query)) { // there remains at least one stage. - executeNextBlock(query); - } else { // if a query is completed due to finished, kill, failure, or error - query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState())); + !executeNextBlock(query, castEvent.getExecutionBlockId())) { + return; } + // if a query is completed due to finished, kill, failure, or error + query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState())); } catch (Throwable t) { LOG.error(t.getMessage(), t); query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR)); http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index f4818f6..9b5980b 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -133,7 +133,7 @@ public class TajoTestingCluster { Preconditions.checkState(testResourceManager.equals(TajoWorkerResourceManager.class.getCanonicalName())); conf.set(ConfVars.RESOURCE_MANAGER_CLASS.varname, System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname)); } - conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 1024); + conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 2048); conf.setFloat(ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.varname, 2.0f); @@ -156,14 +156,15 @@ public class TajoTestingCluster { conf.setIntVar(ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM, 2); // Resource allocator - conf.setIntVar(ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM, 2); + conf.setIntVar(ConfVars.$QUERY_EXECUTE_PARALLEL_MAX, 3); + conf.setIntVar(ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM, 6); // make twice of parallel_max // Memory cache termination conf.setIntVar(ConfVars.WORKER_HISTORY_EXPIRE_PERIOD, 1); conf.setStrings(ConfVars.PYTHON_CODE_DIR.varname, getClass().getResource("/python").toString()); - /* Since Travi CI limits the size of standard output log up to 4MB */ + /* Since Travis CI limits the size of standard output log up to 4MB */ if (!StringUtils.isEmpty(LOG_LEVEL)) { Level defaultLevel = Logger.getRootLogger().getLevel(); Logger.getLogger("org.apache.tajo").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel)); @@ -254,7 +255,7 @@ public class TajoTestingCluster { builder.waitSafeMode(true); this.dfsCluster = builder.build(); - // Set this just-started cluser as our filesystem. + // Set this just-started cluster as our filesystem. this.defaultFS = this.dfsCluster.getFileSystem(); this.conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultFS.getUri().toString()); this.conf.setVar(TajoConf.ConfVars.ROOT_DIR, defaultFS.getUri() + "/tajo"); http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java index c82637d..0f90722 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java @@ -26,6 +26,7 @@ import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; @@ -115,8 +116,7 @@ public class TestExecutionBlockCursor { ExecutionBlockCursor cursor = new ExecutionBlockCursor(plan); int count = 0; - while(cursor.hasNext()) { - cursor.nextBlock(); + for (ExecutionBlock eb : cursor) { count++; } http://git-wip-us.apache.org/repos/asf/tajo/blob/c2725a77/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result index bcd8970..7e741a9 100644 --- a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result +++ b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result @@ -26,6 +26,7 @@ Available Session Variables: \set GROUPBY_PER_SHUFFLE_SIZE [int value] - shuffle output size for sort (mb) \set TABLE_PARTITION_PER_SHUFFLE_SIZE [int value] - shuffle output size for partition table write (mb) \set GROUPBY_MULTI_LEVEL_ENABLED [true or false] - Multiple level groupby enabled +\set QUERY_EXECUTE_PARALLEL [int value] - Maximum parallel running of execution blocks for a query \set EXTSORT_BUFFER_SIZE [long value] - sort buffer size for external sort (mb) \set HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash join (mb) \set INNER_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash inner join (mb)
