Repository: tajo Updated Branches: refs/heads/master 45882b624 -> 080bcf772
TAJO-1746: Improve resource usage at first request of DefaultTaskScheduler. Closes #679 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/080bcf77 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/080bcf77 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/080bcf77 Branch: refs/heads/master Commit: 080bcf772c10e868f363d80f326d91b56a4e9f62 Parents: 45882b6 Author: Jinho Kim <[email protected]> Authored: Tue Aug 11 17:33:03 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Tue Aug 11 17:33:03 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../tajo/querymaster/DefaultTaskScheduler.java | 115 ++++++++++--------- 2 files changed, 61 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/080bcf77/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 6c95f4b..1dce87c 100644 --- a/CHANGES +++ b/CHANGES @@ -32,6 +32,9 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1746: Improve resource usage at first request of DefaultTaskScheduler. + (jinho) + TAJO-1743: Improve calculation of intermediate table statistics. (jinho) TAJO-1699: Tajo Java Client version 2. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/080bcf77/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java index 52f66ef..f9a5767 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java @@ -56,6 +56,7 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.tajo.ResourceProtos.*; @@ -71,6 +72,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { private Thread schedulingThread; private volatile boolean isStopped; + private AtomicBoolean needWakeup = new AtomicBoolean(); private ScheduledRequests scheduledRequests; @@ -96,25 +98,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { scheduledRequests = new ScheduledRequests(); minTaskMemory = tajoConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY); schedulerDelay= tajoConf.getIntVar(TajoConf.ConfVars.QUERYMASTER_TASK_SCHEDULER_DELAY); - super.init(conf); - } - - @Override - public void start() { - LOG.info("Start TaskScheduler"); - maximumRequestContainer = tajoConf.getInt(REQUEST_MAX_NUM, stage.getContext().getWorkerMap().size() * 2); - isLeaf = stage.getMasterPlan().isLeaf(stage.getBlock()); - - if (isLeaf) { - candidateWorkers.addAll(getWorkerIds(getLeafTaskHosts())); - } else { - //find assigned hosts for Non-Leaf locality in children executionBlock - List<ExecutionBlock> executionBlockList = stage.getMasterPlan().getChilds(stage.getBlock()); - for (ExecutionBlock executionBlock : executionBlockList) { - Stage childStage = stage.getContext().getStage(executionBlock.getId()); - candidateWorkers.addAll(childStage.getAssignedWorkerMap().keySet()); - } - } this.schedulingThread = new Thread() { public void run() { @@ -139,6 +122,25 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { LOG.info("TaskScheduler schedulingThread stopped"); } }; + super.init(conf); + } + + @Override + public void start() { + LOG.info("Start TaskScheduler"); + maximumRequestContainer = tajoConf.getInt(REQUEST_MAX_NUM, stage.getContext().getWorkerMap().size() * 2); + isLeaf = stage.getMasterPlan().isLeaf(stage.getBlock()); + + if (isLeaf) { + candidateWorkers.addAll(getWorkerIds(getLeafTaskHosts())); + } else { + //find assigned hosts for Non-Leaf locality in children executionBlock + List<ExecutionBlock> executionBlockList = stage.getMasterPlan().getChilds(stage.getBlock()); + for (ExecutionBlock executionBlock : executionBlockList) { + Stage childStage = stage.getContext().getStage(executionBlock.getId()); + candidateWorkers.addAll(childStage.getAssignedWorkerMap().keySet()); + } + } this.schedulingThread.start(); super.start(); @@ -162,13 +164,14 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { private Fragment[] fragmentsForNonLeafTask; private Fragment[] broadcastFragmentsForNonLeafTask; - public void schedule() throws Exception{ + public void schedule() throws Exception { try { - final int incompleteTaskNum = stage.getTotalScheduledObjectsCount() - stage.getSucceededObjectCount(); + final int incompleteTaskNum = scheduledRequests.leafTaskNum() + scheduledRequests.nonLeafTaskNum(); if (incompleteTaskNum == 0) { - // all task is done, wait for stopping message + needWakeup.set(true); + // all task is done or tasks is not scheduled synchronized (schedulingThread) { - schedulingThread.wait(500); + schedulingThread.wait(1000); } } else { LinkedList<TaskRequestEvent> taskRequests = createTaskRequest(incompleteTaskNum); @@ -183,13 +186,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } if (isLeaf) { - if (scheduledRequests.leafTaskNum() > 0) { - scheduledRequests.assignToLeafTasks(taskRequests); - } + scheduledRequests.assignToLeafTasks(taskRequests); } else { - if (scheduledRequests.nonLeafTaskNum() > 0) { - scheduledRequests.assignToNonLeafTasks(taskRequests); - } + scheduledRequests.assignToNonLeafTasks(taskRequests); } } } @@ -250,6 +249,13 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } else { scheduledRequests.addNonLeafTask(castEvent); } + + if (needWakeup.getAndSet(false)) { + //wake up scheduler thread after scheduled + synchronized (schedulingThread) { + schedulingThread.notifyAll(); + } + } } } else if (event.getType() == EventType.T_SCHEDULE_CANCEL) { // when a stage is killed, unassigned query unit attmpts are canceled from the scheduler. @@ -406,25 +412,18 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { int volumeId = getLowestVolumeId(); TaskAttemptId taskAttemptId = null; - increaseConcurrency(volumeId); if (unassignedTaskForEachVolume.size() > 0) { int retry = unassignedTaskForEachVolume.size(); do { //clean and get a remaining local task taskAttemptId = getAndRemove(volumeId); - if(!unassignedTaskForEachVolume.containsKey(volumeId)) { - decreaseConcurrency(volumeId); - if (volumeId > REMOTE) { - diskVolumeLoads.remove(volumeId); - } - } if (taskAttemptId == null) { //reassign next volume volumeId = getLowestVolumeId(); - increaseConcurrency(volumeId); retry--; } else { + lastAssignedVolumeId.put(taskAttemptId, volumeId); break; } } while (retry > 0); @@ -432,7 +431,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { this.remainTasksNum.set(0); } - lastAssignedVolumeId.put(taskAttemptId, volumeId); return taskAttemptId; } @@ -446,9 +444,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { int volumeId = getLowestVolumeId(); taskAttemptId = getAndRemove(volumeId); if (taskAttemptId == null) { - if (volumeId > REMOTE) { - diskVolumeLoads.remove(volumeId); - } retry--; } else { break; @@ -460,10 +455,15 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { private synchronized TaskAttemptId getAndRemove(int volumeId){ TaskAttemptId taskAttemptId = null; - if(!unassignedTaskForEachVolume.containsKey(volumeId)) return taskAttemptId; + if(!unassignedTaskForEachVolume.containsKey(volumeId)) { + if (volumeId > REMOTE) { + diskVolumeLoads.remove(volumeId); + } + return taskAttemptId; + } LinkedHashSet<TaskAttempt> list = unassignedTaskForEachVolume.get(volumeId); - if(list != null && list.size() > 0){ + if (list != null && !list.isEmpty()) { TaskAttempt taskAttempt; synchronized (unassignedTaskForEachVolume) { Iterator<TaskAttempt> iterator = list.iterator(); @@ -471,21 +471,17 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { iterator.remove(); } - this.remainTasksNum.getAndDecrement(); taskAttemptId = taskAttempt.getId(); for (DataLocation location : taskAttempt.getTask().getDataLocations()) { - if (!this.getHost().equals(location.getHost())) { - HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost()); - if (volumeMapping != null) { - volumeMapping.removeTaskAttempt(location.getVolumeId(), taskAttempt); - } + HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost()); + if (volumeMapping != null) { + volumeMapping.removeTaskAttempt(location.getVolumeId(), taskAttempt); } } - } - if(list == null || list.isEmpty()) { - unassignedTaskForEachVolume.remove(volumeId); + increaseConcurrency(volumeId); } + return taskAttemptId; } @@ -493,12 +489,15 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { if(!unassignedTaskForEachVolume.containsKey(volumeId)) return; LinkedHashSet<TaskAttempt> tasks = unassignedTaskForEachVolume.get(volumeId); - - if(tasks != null && tasks.size() > 0){ - tasks.remove(taskAttempt); + if(tasks.remove(taskAttempt)) { remainTasksNum.getAndDecrement(); - } else { + } + + if(tasks.isEmpty()){ unassignedTaskForEachVolume.remove(volumeId); + if (volumeId > REMOTE) { + diskVolumeLoads.remove(volumeId); + } } } @@ -538,7 +537,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { if(concurrency > 0){ diskVolumeLoads.put(volumeId, concurrency - 1); } else { - if (volumeId > REMOTE) { + if (volumeId > REMOTE && !unassignedTaskForEachVolume.containsKey(volumeId)) { diskVolumeLoads.remove(volumeId); } } @@ -594,6 +593,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { public void cancel(TaskAttempt taskAttempt) { if(taskAttempt.isLeafTask()) { + releaseTaskAttempt(taskAttempt); + List<DataLocation> locations = taskAttempt.getTask().getDataLocations(); for (DataLocation location : locations) {
