TAJO-1049: Remove the parallel degree limit up to the maximum cluster capacity.
Closes #147 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/e5f7c959 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/e5f7c959 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/e5f7c959 Branch: refs/heads/block_iteration Commit: e5f7c9591c71c998e702c789d249af20da3eeb6b Parents: 1b3d51e Author: Hyunsik Choi <[email protected]> Authored: Sun Sep 21 11:14:06 2014 -0700 Committer: Hyunsik Choi <[email protected]> Committed: Sun Sep 21 11:14:06 2014 -0700 ---------------------------------------------------------------------- CHANGES | 10 +++++++--- .../apache/tajo/master/querymaster/SubQuery.java | 17 ++--------------- 2 files changed, 9 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/e5f7c959/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 995cd27..3933ab9 100644 --- a/CHANGES +++ b/CHANGES @@ -31,11 +31,15 @@ Release 0.9.0 - unreleased IMPROVEMENT + TAJO-1049: Remove the parallel degree limit up to the maximum cluster + capacity. (hyunsik) + TAJO-1046: Remove hadoop native dependency of pullserver. (jinho) TAJO-1040: Misuse netty HashedWheelTimer. (jinho) - TAJO-1034: Reduce Explicit Use of JVM Internal Class. (Jihun Kang via hyunsik) + TAJO-1034: Reduce Explicit Use of JVM Internal Class. + (Jihun Kang via hyunsik) TAJO-1027: Upgrade Hive to 0.13.0 and 0.13.1. (jaehwa) @@ -44,8 +48,8 @@ Release 0.9.0 - unreleased TAJO-937: Should use tajo.util.VersionInfo instead of TajoConstants.TAJO_VERSION. (Mai Hai Thanh via hyunsik) - TAJO-991: Running PullServer on a dedicated JVM process which separates from worker. - (Hyoungjun Kim) + TAJO-991: Running PullServer on a dedicated JVM process which separates + from worker. (Hyoungjun Kim) TAJO-906: Runtime code generation for evaluating expression trees. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/e5f7c959/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index 1931300..919ac9b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -774,14 +774,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { int mb = (int) Math.ceil((double) bigger / 1048576); LOG.info(subQuery.getId() + ", Bigger Table's volume is approximately " + mb + " MB"); - int taskNum = (int) Math.ceil((double) mb / - masterPlan.getContext().getInt(SessionVars.JOIN_PER_SHUFFLE_SIZE)); - - int totalMem = getClusterTotalMemory(subQuery); - LOG.info(subQuery.getId() + ", Total memory of cluster is " + totalMem + " MB"); - int slots = Math.max(totalMem / conf.getIntVar(ConfVars.TASK_DEFAULT_MEMORY), 1); - // determine the number of task - taskNum = Math.min(taskNum, slots); + int taskNum = (int) Math.ceil((double) mb / masterPlan.getContext().getInt(SessionVars.JOIN_PER_SHUFFLE_SIZE)); if (masterPlan.getContext().containsKey(SessionVars.TEST_MIN_TASK_NUM)) { taskNum = masterPlan.getContext().getInt(SessionVars.TEST_MIN_TASK_NUM); @@ -827,14 +820,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> { int volumeByMB = (int) Math.ceil((double) volume / StorageUnit.MB); LOG.info(subQuery.getId() + ", Table's volume is approximately " + volumeByMB + " MB"); // determine the number of task - int taskNumBySize = (int) Math.ceil((double) volumeByMB / + int taskNum = (int) Math.ceil((double) volumeByMB / masterPlan.getContext().getInt(SessionVars.GROUPBY_PER_SHUFFLE_SIZE)); - - int totalMem = getClusterTotalMemory(subQuery); - - LOG.info(subQuery.getId() + ", Total memory of cluster is " + totalMem + " MB"); - int slots = Math.max(totalMem / conf.getIntVar(ConfVars.TASK_DEFAULT_MEMORY), 1); - int taskNum = Math.min(taskNumBySize, slots); //Maximum partitions LOG.info(subQuery.getId() + ", The determined number of aggregation partitions is " + taskNum); return taskNum; }
