Repository: tajo Updated Branches: refs/heads/master 2c9305add -> f2c434332
TAJO-1707: Rack local count can be more than actual number of tasks. Closes #717 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/f2c43433 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/f2c43433 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/f2c43433 Branch: refs/heads/master Commit: f2c434332822fbf53d9b22810509baf5c9718e56 Parents: 2c9305a Author: Jinho Kim <[email protected]> Authored: Wed Sep 2 17:04:13 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Wed Sep 2 17:04:13 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 ++ .../tajo/querymaster/AbstractTaskScheduler.java | 6 ++++ .../tajo/querymaster/DefaultTaskScheduler.java | 38 ++++++++++---------- 3 files changed, 29 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/f2c43433/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index b2ace54..23291e5 100644 --- a/CHANGES +++ b/CHANGES @@ -234,6 +234,9 @@ Release 0.11.0 - unreleased BUG FIXES + TAJO-1707: Rack local count can be more than actual number of tasks. + (jinho) + TAJO-1808: Wrong table type problem in catalog. (jihoon) TAJO-1801: Table name is not unique of tableDescMap in QueryMasterTask. http://git-wip-us.apache.org/repos/asf/tajo/blob/f2c43433/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java index 8636eaa..1651bb3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java @@ -32,6 +32,8 @@ public abstract class AbstractTaskScheduler extends AbstractService implements E protected int rackLocalAssigned; protected int totalAssigned; protected int cancellation; + protected int totalAttempts; + protected Set<String> leafTaskHosts = Sets.newHashSet(); /** @@ -59,6 +61,10 @@ public abstract class AbstractTaskScheduler extends AbstractService implements E return cancellation; } + public int getTotalAttempts() { + return totalAttempts; + } + public abstract void releaseTaskAttempt(TaskAttempt taskAttempt); public abstract int remainingScheduledObjectNum(); http://git-wip-us.apache.org/repos/asf/tajo/blob/f2c43433/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java index f9a5767..d380295 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java @@ -98,6 +98,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { scheduledRequests = new ScheduledRequests(); minTaskMemory = tajoConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY); schedulerDelay= tajoConf.getIntVar(TajoConf.ConfVars.QUERYMASTER_TASK_SCHEDULER_DELAY); + isLeaf = stage.getMasterPlan().isLeaf(stage.getBlock()); this.schedulingThread = new Thread() { public void run() { @@ -129,7 +130,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { public void start() { LOG.info("Start TaskScheduler"); maximumRequestContainer = tajoConf.getInt(REQUEST_MAX_NUM, stage.getContext().getWorkerMap().size() * 2); - isLeaf = stage.getMasterPlan().isLeaf(stage.getBlock()); if (isLeaf) { candidateWorkers.addAll(getWorkerIds(getLeafTaskHosts())); @@ -686,9 +686,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { //find remaining local task if (leafTasks.contains(attemptId)) { leafTasks.remove(attemptId); - //LOG.info(attemptId + " Assigned based on host match " + hostName); - hostLocalAssigned++; - totalAssigned++; return attemptId; } } @@ -755,15 +752,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } } - if (attemptId != null) { - rackLocalAssigned++; - totalAssigned++; - - LOG.info(String.format("Assigned Local/Rack/Cancel/Total: (%d/%d/%d/%d), Locality: %.2f%%, Rack host: %s", - hostLocalAssigned, rackLocalAssigned, cancellation, totalAssigned, - ((double) hostLocalAssigned / (double) totalAssigned) * 100, host)); - - } return attemptId; } @@ -775,6 +763,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { TaskRequestEvent taskRequest; while (leafTasks.size() > 0 && (!taskRequests.isEmpty() || !remoteTaskRequests.isEmpty())) { + int localAssign = 0; + int rackAssign = 0; + taskRequest = taskRequests.pollFirst(); if(taskRequest == null) { // if there are only remote task requests taskRequest = remoteTaskRequests.pollFirst(); @@ -855,13 +846,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { synchronized (leafTasks){ attemptId = leafTasks.iterator().next(); leafTasks.remove(attemptId); - rackLocalAssigned++; - totalAssigned++; - LOG.info(String.format("Assigned Local/Remote/Cancel/Total: (%d/%d/%d/%d), Locality: %.2f%%,", - hostLocalAssigned, rackLocalAssigned, cancellation, totalAssigned, - ((double) hostLocalAssigned / (double) totalAssigned) * 100)); } } + rackAssign++; + } else { + localAssign++; } if (attemptId != null) { @@ -894,6 +883,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { AsyncRpcClient tajoWorkerRpc = null; CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>(); + totalAttempts++; try { tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); @@ -917,6 +907,18 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { LOG.error(e); } scheduledObjectNum--; + totalAssigned++; + hostLocalAssigned += localAssign; + rackLocalAssigned += rackAssign; + + if (rackAssign > 0) { + LOG.info(String.format("Assigned Local/Rack/Total: (%d/%d/%d), " + + "Attempted Cancel/Assign/Total: (%d/%d/%d), " + + "Locality: %.2f%%, Rack host: %s", + hostLocalAssigned, rackLocalAssigned, totalAssigned, + cancellation, totalAssigned, totalAttempts, + ((double) hostLocalAssigned / (double) totalAssigned) * 100, host)); + } } else { throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!");
