Repository: tajo
Updated Branches:
  refs/heads/master 45882b624 -> 080bcf772


TAJO-1746: Improve resource usage at first request of DefaultTaskScheduler.

Closes #679


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/080bcf77
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/080bcf77
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/080bcf77

Branch: refs/heads/master
Commit: 080bcf772c10e868f363d80f326d91b56a4e9f62
Parents: 45882b6
Author: Jinho Kim <[email protected]>
Authored: Tue Aug 11 17:33:03 2015 +0900
Committer: Jinho Kim <[email protected]>
Committed: Tue Aug 11 17:33:03 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../tajo/querymaster/DefaultTaskScheduler.java  | 115 ++++++++++---------
 2 files changed, 61 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/080bcf77/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 6c95f4b..1dce87c 100644
--- a/CHANGES
+++ b/CHANGES
@@ -32,6 +32,9 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1746: Improve resource usage at first request of DefaultTaskScheduler.
+    (jinho)
+
     TAJO-1743: Improve calculation of intermediate table statistics. (jinho)
 
     TAJO-1699: Tajo Java Client version 2. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/080bcf77/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
index 52f66ef..f9a5767 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
@@ -56,6 +56,7 @@ import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.tajo.ResourceProtos.*;
@@ -71,6 +72,7 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
 
   private Thread schedulingThread;
   private volatile boolean isStopped;
+  private AtomicBoolean needWakeup = new AtomicBoolean();
 
   private ScheduledRequests scheduledRequests;
 
@@ -96,25 +98,6 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
     scheduledRequests = new ScheduledRequests();
     minTaskMemory = 
tajoConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY);
     schedulerDelay= 
tajoConf.getIntVar(TajoConf.ConfVars.QUERYMASTER_TASK_SCHEDULER_DELAY);
-    super.init(conf);
-  }
-
-  @Override
-  public void start() {
-    LOG.info("Start TaskScheduler");
-    maximumRequestContainer = tajoConf.getInt(REQUEST_MAX_NUM, 
stage.getContext().getWorkerMap().size() * 2);
-    isLeaf = stage.getMasterPlan().isLeaf(stage.getBlock());
-
-    if (isLeaf) {
-      candidateWorkers.addAll(getWorkerIds(getLeafTaskHosts()));
-    } else {
-      //find assigned hosts for Non-Leaf locality in children executionBlock
-      List<ExecutionBlock> executionBlockList = 
stage.getMasterPlan().getChilds(stage.getBlock());
-      for (ExecutionBlock executionBlock : executionBlockList) {
-        Stage childStage = stage.getContext().getStage(executionBlock.getId());
-        candidateWorkers.addAll(childStage.getAssignedWorkerMap().keySet());
-      }
-    }
 
     this.schedulingThread = new Thread() {
       public void run() {
@@ -139,6 +122,25 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
         LOG.info("TaskScheduler schedulingThread stopped");
       }
     };
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    LOG.info("Start TaskScheduler");
+    maximumRequestContainer = tajoConf.getInt(REQUEST_MAX_NUM, 
stage.getContext().getWorkerMap().size() * 2);
+    isLeaf = stage.getMasterPlan().isLeaf(stage.getBlock());
+
+    if (isLeaf) {
+      candidateWorkers.addAll(getWorkerIds(getLeafTaskHosts()));
+    } else {
+      //find assigned hosts for Non-Leaf locality in children executionBlock
+      List<ExecutionBlock> executionBlockList = 
stage.getMasterPlan().getChilds(stage.getBlock());
+      for (ExecutionBlock executionBlock : executionBlockList) {
+        Stage childStage = stage.getContext().getStage(executionBlock.getId());
+        candidateWorkers.addAll(childStage.getAssignedWorkerMap().keySet());
+      }
+    }
 
     this.schedulingThread.start();
     super.start();
@@ -162,13 +164,14 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
   private Fragment[] fragmentsForNonLeafTask;
   private Fragment[] broadcastFragmentsForNonLeafTask;
 
-  public void schedule() throws Exception{
+  public void schedule() throws Exception {
     try {
-      final int incompleteTaskNum = stage.getTotalScheduledObjectsCount() - 
stage.getSucceededObjectCount();
+      final int incompleteTaskNum = scheduledRequests.leafTaskNum() + 
scheduledRequests.nonLeafTaskNum();
       if (incompleteTaskNum == 0) {
-        // all task is done, wait for stopping message
+        needWakeup.set(true);
+        // all task is done or tasks is not scheduled
         synchronized (schedulingThread) {
-          schedulingThread.wait(500);
+          schedulingThread.wait(1000);
         }
       } else {
         LinkedList<TaskRequestEvent> taskRequests = 
createTaskRequest(incompleteTaskNum);
@@ -183,13 +186,9 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
           }
 
           if (isLeaf) {
-            if (scheduledRequests.leafTaskNum() > 0) {
-              scheduledRequests.assignToLeafTasks(taskRequests);
-            }
+            scheduledRequests.assignToLeafTasks(taskRequests);
           } else {
-            if (scheduledRequests.nonLeafTaskNum() > 0) {
-              scheduledRequests.assignToNonLeafTasks(taskRequests);
-            }
+            scheduledRequests.assignToNonLeafTasks(taskRequests);
           }
         }
       }
@@ -250,6 +249,13 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
         } else {
           scheduledRequests.addNonLeafTask(castEvent);
         }
+
+        if (needWakeup.getAndSet(false)) {
+          //wake up scheduler thread after scheduled
+          synchronized (schedulingThread) {
+            schedulingThread.notifyAll();
+          }
+        }
       }
     } else if (event.getType() == EventType.T_SCHEDULE_CANCEL) {
       // when a stage is killed, unassigned query unit attmpts are canceled 
from the scheduler.
@@ -406,25 +412,18 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
       int volumeId = getLowestVolumeId();
       TaskAttemptId taskAttemptId = null;
 
-      increaseConcurrency(volumeId);
       if (unassignedTaskForEachVolume.size() >  0) {
         int retry = unassignedTaskForEachVolume.size();
         do {
           //clean and get a remaining local task
           taskAttemptId = getAndRemove(volumeId);
-          if(!unassignedTaskForEachVolume.containsKey(volumeId)) {
-            decreaseConcurrency(volumeId);
-            if (volumeId > REMOTE) {
-              diskVolumeLoads.remove(volumeId);
-            }
-          }
 
           if (taskAttemptId == null) {
             //reassign next volume
             volumeId = getLowestVolumeId();
-            increaseConcurrency(volumeId);
             retry--;
           } else {
+            lastAssignedVolumeId.put(taskAttemptId, volumeId);
             break;
           }
         } while (retry > 0);
@@ -432,7 +431,6 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
         this.remainTasksNum.set(0);
       }
 
-      lastAssignedVolumeId.put(taskAttemptId, volumeId);
       return taskAttemptId;
     }
 
@@ -446,9 +444,6 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
           int volumeId = getLowestVolumeId();
           taskAttemptId = getAndRemove(volumeId);
           if (taskAttemptId == null) {
-            if (volumeId > REMOTE) {
-              diskVolumeLoads.remove(volumeId);
-            }
             retry--;
           } else {
             break;
@@ -460,10 +455,15 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
 
     private synchronized TaskAttemptId getAndRemove(int volumeId){
       TaskAttemptId taskAttemptId = null;
-      if(!unassignedTaskForEachVolume.containsKey(volumeId)) return 
taskAttemptId;
+      if(!unassignedTaskForEachVolume.containsKey(volumeId)) {
+        if (volumeId > REMOTE) {
+          diskVolumeLoads.remove(volumeId);
+        }
+        return taskAttemptId;
+      }
 
       LinkedHashSet<TaskAttempt> list = 
unassignedTaskForEachVolume.get(volumeId);
-      if(list != null && list.size() > 0){
+      if (list != null && !list.isEmpty()) {
         TaskAttempt taskAttempt;
         synchronized (unassignedTaskForEachVolume) {
           Iterator<TaskAttempt> iterator = list.iterator();
@@ -471,21 +471,17 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
           iterator.remove();
         }
 
-        this.remainTasksNum.getAndDecrement();
         taskAttemptId = taskAttempt.getId();
         for (DataLocation location : taskAttempt.getTask().getDataLocations()) 
{
-          if (!this.getHost().equals(location.getHost())) {
-            HostVolumeMapping volumeMapping = 
scheduledRequests.leafTaskHostMapping.get(location.getHost());
-            if (volumeMapping != null) {
-              volumeMapping.removeTaskAttempt(location.getVolumeId(), 
taskAttempt);
-            }
+          HostVolumeMapping volumeMapping = 
scheduledRequests.leafTaskHostMapping.get(location.getHost());
+          if (volumeMapping != null) {
+            volumeMapping.removeTaskAttempt(location.getVolumeId(), 
taskAttempt);
           }
         }
-      }
 
-      if(list == null || list.isEmpty()) {
-        unassignedTaskForEachVolume.remove(volumeId);
+        increaseConcurrency(volumeId);
       }
+
       return taskAttemptId;
     }
 
@@ -493,12 +489,15 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
       if(!unassignedTaskForEachVolume.containsKey(volumeId)) return;
 
       LinkedHashSet<TaskAttempt> tasks  = 
unassignedTaskForEachVolume.get(volumeId);
-
-      if(tasks != null && tasks.size() > 0){
-        tasks.remove(taskAttempt);
+      if(tasks.remove(taskAttempt)) {
         remainTasksNum.getAndDecrement();
-      } else {
+      }
+
+      if(tasks.isEmpty()){
         unassignedTaskForEachVolume.remove(volumeId);
+        if (volumeId > REMOTE) {
+          diskVolumeLoads.remove(volumeId);
+        }
       }
     }
 
@@ -538,7 +537,7 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
         if(concurrency > 0){
           diskVolumeLoads.put(volumeId, concurrency - 1);
         } else {
-          if (volumeId > REMOTE) {
+          if (volumeId > REMOTE && 
!unassignedTaskForEachVolume.containsKey(volumeId)) {
             diskVolumeLoads.remove(volumeId);
           }
         }
@@ -594,6 +593,8 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
   public void cancel(TaskAttempt taskAttempt) {
 
     if(taskAttempt.isLeafTask()) {
+      releaseTaskAttempt(taskAttempt);
+
       List<DataLocation> locations = taskAttempt.getTask().getDataLocations();
 
       for (DataLocation location : locations) {

Reply via email to