http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/SchedulerEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/SchedulerEvent.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/SchedulerEvent.java
new file mode 100644
index 0000000..856c572
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/SchedulerEvent.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.scheduler.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class SchedulerEvent extends AbstractEvent<SchedulerEventType> {
+  public SchedulerEvent(SchedulerEventType type) {
+    super(type);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/SchedulerEventType.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/SchedulerEventType.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/SchedulerEventType.java
new file mode 100644
index 0000000..93fa032
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/SchedulerEventType.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.scheduler.event;
+
+public enum SchedulerEventType {
+
+  // consumer: Scheduler
+  RESOURCE_RESERVE,
+  RESOURCE_UPDATE
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/metrics/WorkerResourceMetricsGaugeSet.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/metrics/WorkerResourceMetricsGaugeSet.java
 
b/tajo-core/src/main/java/org/apache/tajo/metrics/WorkerResourceMetricsGaugeSet.java
index 229a80a..200d689 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/metrics/WorkerResourceMetricsGaugeSet.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/metrics/WorkerResourceMetricsGaugeSet.java
@@ -22,8 +22,8 @@ import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricSet;
 import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.rm.Worker;
-import org.apache.tajo.master.rm.WorkerState;
+import org.apache.tajo.master.rm.NodeStatus;
+import org.apache.tajo.master.rm.NodeState;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -40,31 +40,31 @@ public class WorkerResourceMetricsGaugeSet implements 
MetricSet {
     metricsMap.put("totalWorkers", new Gauge<Integer>() {
       @Override
       public Integer getValue() {
-        return tajoMasterContext.getResourceManager().getWorkers().size();
+        return tajoMasterContext.getResourceManager().getNodes().size();
       }
     });
 
     metricsMap.put("liveWorkers", new Gauge<Integer>() {
       @Override
       public Integer getValue() {
-        return getNumWorkers(WorkerState.RUNNING);
+        return getNumWorkers(NodeState.RUNNING);
       }
     });
 
     metricsMap.put("deadWorkers", new Gauge<Integer>() {
       @Override
       public Integer getValue() {
-        return getNumWorkers(WorkerState.LOST);
+        return getNumWorkers(NodeState.LOST);
       }
     });
 
     return metricsMap;
   }
 
-  protected int getNumWorkers(WorkerState status) {
+  protected int getNumWorkers(NodeState status) {
     int numWorkers = 0;
-    for(Worker eachWorker: 
tajoMasterContext.getResourceManager().getWorkers().values()) {
-      if(eachWorker.getState() == status) {
+    for(NodeStatus eachNodeStatus : 
tajoMasterContext.getResourceManager().getNodes().values()) {
+      if(eachNodeStatus.getState() == status) {
         numWorkers++;
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java
 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java
index e45f274..8636eaa 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java
@@ -18,17 +18,21 @@
 
 package org.apache.tajo.querymaster;
 
+import com.google.common.collect.Sets;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tajo.master.event.TaskRequestEvent;
 import org.apache.tajo.master.event.TaskSchedulerEvent;
 
+import java.util.Set;
+
 
 public abstract class AbstractTaskScheduler extends AbstractService implements 
EventHandler<TaskSchedulerEvent> {
 
   protected int hostLocalAssigned;
   protected int rackLocalAssigned;
   protected int totalAssigned;
+  protected int cancellation;
+  protected Set<String> leafTaskHosts = Sets.newHashSet();
 
   /**
    * Construct the service.
@@ -51,6 +55,15 @@ public abstract class AbstractTaskScheduler extends 
AbstractService implements E
     return totalAssigned;
   }
 
-  public abstract void handleTaskRequestEvent(TaskRequestEvent event);
+  public int getCancellation() {
+    return cancellation;
+  }
+
+  public abstract void releaseTaskAttempt(TaskAttempt taskAttempt);
   public abstract int remainingScheduledObjectNum();
+
+
+  public Set<String> getLeafTaskHosts(){
+    return leafTaskHosts;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
index 939de60..32e4219 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
@@ -20,56 +20,69 @@ package org.apache.tajo.querymaster;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.TaskRequest;
 import org.apache.tajo.engine.query.TaskRequestImpl;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import 
org.apache.tajo.ipc.QueryCoordinatorProtocol.QueryCoordinatorProtocolService;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.ContainerProxy;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.master.event.*;
 import 
org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
 import org.apache.tajo.plan.serder.LogicalNodeSerializer;
-import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.rpc.*;
+import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.storage.DataLocation;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.FetchImpl;
 
+import java.net.InetSocketAddress;
 import java.util.*;
 import java.util.Map.Entry;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import static org.apache.tajo.ResourceProtos.*;
 
 public class DefaultTaskScheduler extends AbstractTaskScheduler {
   private static final Log LOG = LogFactory.getLog(DefaultTaskScheduler.class);
 
+  private static final String REQUEST_MAX_NUM = 
"tajo.qm.task-scheduler.request.max-num";
+
   private final TaskSchedulerContext context;
   private Stage stage;
+  private TajoConf tajoConf;
 
   private Thread schedulingThread;
-  private AtomicBoolean stopEventHandling = new AtomicBoolean(false);
+  private volatile boolean isStopped;
 
   private ScheduledRequests scheduledRequests;
-  private TaskRequests taskRequests;
 
+  private int minTaskMemory;
   private int nextTaskId = 0;
   private int scheduledObjectNum = 0;
+  private boolean isLeaf;
+  private int schedulerDelay;
+  private int maximumRequestContainer;
+
+  //candidate workers for locality of high priority
+  private Set<Integer> candidateWorkers = Sets.newHashSet();
 
   public DefaultTaskScheduler(TaskSchedulerContext context, Stage stage) {
     super(DefaultTaskScheduler.class.getName());
@@ -79,30 +92,47 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
 
   @Override
   public void init(Configuration conf) {
-
+    tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
     scheduledRequests = new ScheduledRequests();
-    taskRequests  = new TaskRequests();
-
+    minTaskMemory = 
tajoConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY);
+    schedulerDelay= 
tajoConf.getIntVar(TajoConf.ConfVars.QUERYMASTER_TASK_SCHEDULER_DELAY);
     super.init(conf);
   }
 
   @Override
   public void start() {
     LOG.info("Start TaskScheduler");
+    maximumRequestContainer = tajoConf.getInt(REQUEST_MAX_NUM, 
stage.getContext().getWorkerMap().size() * 2);
+    isLeaf = stage.getMasterPlan().isLeaf(stage.getBlock());
+
+    if (isLeaf) {
+      candidateWorkers.addAll(getWorkerIds(getLeafTaskHosts()));
+    } else {
+      //find assigned hosts for Non-Leaf locality in children executionBlock
+      List<ExecutionBlock> executionBlockList = 
stage.getMasterPlan().getChilds(stage.getBlock());
+      for (ExecutionBlock executionBlock : executionBlockList) {
+        Stage childStage = stage.getContext().getStage(executionBlock.getId());
+        candidateWorkers.addAll(childStage.getAssignedWorkerMap().keySet());
+      }
+    }
 
     this.schedulingThread = new Thread() {
       public void run() {
 
-        while(!stopEventHandling.get() && 
!Thread.currentThread().isInterrupted()) {
+        while (!isStopped && !Thread.currentThread().isInterrupted()) {
+
           try {
-            synchronized (schedulingThread){
-              schedulingThread.wait(100);
-            }
             schedule();
           } catch (InterruptedException e) {
-            break;
+            if (isStopped) {
+              break;
+            } else {
+              LOG.fatal(e.getMessage(), e);
+              stage.abort(StageState.ERROR);
+            }
           } catch (Throwable e) {
             LOG.fatal(e.getMessage(), e);
+            stage.abort(StageState.ERROR);
             break;
           }
         }
@@ -114,41 +144,17 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
     super.start();
   }
 
-  private static final TaskAttemptId NULL_ATTEMPT_ID;
-  public static final TajoWorkerProtocol.TaskRequestProto stopTaskRunnerReq;
-  static {
-    ExecutionBlockId nullStage = 
QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
-    NULL_ATTEMPT_ID = 
QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, 0), 0);
-
-    TajoWorkerProtocol.TaskRequestProto.Builder builder =
-        TajoWorkerProtocol.TaskRequestProto.newBuilder();
-    builder.setId(NULL_ATTEMPT_ID.getProto());
-    builder.setShouldDie(true);
-    builder.setOutputTable("");
-    builder.setPlan(PlanProto.LogicalNodeTree.newBuilder());
-    builder.setClusteredOutput(false);
-    stopTaskRunnerReq = builder.build();
-  }
-
   @Override
   public void stop() {
-    if(stopEventHandling.getAndSet(true)){
-      return;
-    }
+    isStopped = true;
 
     if (schedulingThread != null) {
       synchronized (schedulingThread) {
-        schedulingThread.notifyAll();
+        schedulingThread.interrupt();
       }
     }
-
-    // Return all of request callbacks instantly.
-    if(taskRequests != null){
-      for (TaskRequestEvent req : taskRequests.taskRequestQueue) {
-        req.getCallback().run(stopTaskRunnerReq);
-      }
-    }
-
+    candidateWorkers.clear();
+    scheduledRequests.clear();
     LOG.info("Task Scheduler stopped");
     super.stop();
   }
@@ -156,34 +162,38 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
   private Fragment[] fragmentsForNonLeafTask;
   private Fragment[] broadcastFragmentsForNonLeafTask;
 
-  LinkedList<TaskRequestEvent> taskRequestEvents = new 
LinkedList<TaskRequestEvent>();
-  public void schedule() {
-
-    if (taskRequests.size() > 0) {
-      if (scheduledRequests.leafTaskNum() > 0) {
-        LOG.debug("Try to schedule tasks with taskRequestEvents: " +
-            taskRequests.size() + ", LeafTask Schedule Request: " +
-            scheduledRequests.leafTaskNum());
-        taskRequests.getTaskRequests(taskRequestEvents,
-            scheduledRequests.leafTaskNum());
-        LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
-        if (taskRequestEvents.size() > 0) {
-          scheduledRequests.assignToLeafTasks(taskRequestEvents);
-          taskRequestEvents.clear();
+  public void schedule() throws Exception{
+    try {
+      if (remainingScheduledObjectNum() == 0) {
+        // all task is done, wait for stopping message
+        synchronized (schedulingThread) {
+          schedulingThread.wait(500);
         }
-      }
-    }
+      } else {
+        LinkedList<TaskRequestEvent> taskRequests = createTaskRequest();
 
-    if (taskRequests.size() > 0) {
-      if (scheduledRequests.nonLeafTaskNum() > 0) {
-        LOG.debug("Try to schedule tasks with taskRequestEvents: " +
-            taskRequests.size() + ", NonLeafTask Schedule Request: " +
-            scheduledRequests.nonLeafTaskNum());
-        taskRequests.getTaskRequests(taskRequestEvents,
-            scheduledRequests.nonLeafTaskNum());
-        scheduledRequests.assignToNonLeafTasks(taskRequestEvents);
-        taskRequestEvents.clear();
+        if (taskRequests.size() == 0) {
+          synchronized (schedulingThread) {
+            schedulingThread.wait(schedulerDelay);
+          }
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Get " + taskRequests.size() + " taskRequestEvents ");
+          }
+
+          if (isLeaf) {
+            if (scheduledRequests.leafTaskNum() > 0) {
+              scheduledRequests.assignToLeafTasks(taskRequests);
+            }
+          } else {
+            if (scheduledRequests.nonLeafTaskNum() > 0) {
+              scheduledRequests.assignToNonLeafTasks(taskRequests);
+            }
+          }
+        }
       }
+    } catch (TimeoutException e) {
+      LOG.error(e.getMessage());
     }
   }
 
@@ -251,19 +261,55 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
     }
   }
 
-  @Override
-  public void handleTaskRequestEvent(TaskRequestEvent event) {
-
-    taskRequests.handle(event);
-    int hosts = scheduledRequests.leafTaskHostMapping.size();
+  private Set<Integer> getWorkerIds(Collection<String> hosts){
+    Set<Integer> workerIds = Sets.newHashSet();
+    if(hosts.isEmpty()) return workerIds;
 
-    // if available cluster resource are large then tasks, the scheduler 
thread are working immediately.
-    if(remainingScheduledObjectNum() > 0 &&
-        (remainingScheduledObjectNum() <= hosts || hosts <= 
taskRequests.size())){
-      synchronized (schedulingThread){
-        schedulingThread.notifyAll();
+    for (WorkerConnectionInfo worker : 
stage.getContext().getWorkerMap().values()) {
+      if(hosts.contains(worker.getHost())){
+        workerIds.add(worker.getId());
       }
     }
+    return workerIds;
+  }
+
+
+  protected LinkedList<TaskRequestEvent> createTaskRequest() throws Exception {
+    LinkedList<TaskRequestEvent> taskRequestEvents = new 
LinkedList<TaskRequestEvent>();
+
+    //If scheduled tasks is long-term task, cluster resource can be the worst 
load balance.
+    //This part is to throttle the maximum required container per request
+    int requestContainerNum = Math.min(remainingScheduledObjectNum(), 
maximumRequestContainer);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Try to schedule task resources: " + requestContainerNum);
+    }
+
+    ServiceTracker serviceTracker =
+        
context.getMasterContext().getQueryMasterContext().getWorkerContext().getServiceTracker();
+    NettyClientBase tmClient = RpcClientManager.getInstance().
+        getClient(serviceTracker.getUmbilicalAddress(), 
QueryCoordinatorProtocol.class, true);
+    QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
+
+    CallFuture<NodeResourceResponse> callBack = new 
CallFuture<NodeResourceResponse>();
+    NodeResourceRequest.Builder request = NodeResourceRequest.newBuilder();
+    request.setCapacity(NodeResources.createResource(minTaskMemory, isLeaf ? 1 
: 0).getProto())
+        .setNumContainers(requestContainerNum)
+        .setPriority(stage.getPriority())
+        .setQueryId(context.getMasterContext().getQueryId().getProto())
+        .setType(isLeaf ? ResourceType.LEAF : ResourceType.INTERMEDIATE)
+        .setUserId(context.getMasterContext().getQueryContext().getUser())
+        .setRunningTasks(stage.getTotalScheduledObjectsCount() - 
stage.getCompletedTaskCount())
+        .addAllCandidateNodes(candidateWorkers)
+        .setQueue(context.getMasterContext().getQueryContext().get("queue", 
"default")); //TODO set queue
+
+    masterClientService.reserveNodeResources(callBack.getController(), 
request.build(), callBack);
+    NodeResourceResponse response = 
callBack.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+    for (AllocationResourceProto resource : response.getResourceList()) {
+      taskRequestEvents.add(new TaskRequestEvent(resource.getWorkerId(), 
resource, context.getBlockId()));
+    }
+
+    return taskRequestEvents;
   }
 
   @Override
@@ -271,43 +317,16 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
     return scheduledObjectNum;
   }
 
-  private class TaskRequests implements EventHandler<TaskRequestEvent> {
-    private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue =
-        new LinkedBlockingQueue<TaskRequestEvent>();
-
-    @Override
-    public void handle(TaskRequestEvent event) {
-      if(LOG.isDebugEnabled()){
-        LOG.debug("TaskRequest: " + event.getContainerId() + "," + 
event.getExecutionBlockId());
-      }
+  public void releaseTaskAttempt(TaskAttempt taskAttempt) {
+    if (taskAttempt.isLeafTask() && taskAttempt.getWorkerConnectionInfo() != 
null) {
 
-      if(stopEventHandling.get()) {
-        event.getCallback().run(stopTaskRunnerReq);
-        return;
+      HostVolumeMapping mapping =
+          
scheduledRequests.leafTaskHostMapping.get(taskAttempt.getWorkerConnectionInfo().getHost());
+      if (mapping != null && 
mapping.lastAssignedVolumeId.containsKey(taskAttempt.getId())) {
+        
mapping.decreaseConcurrency(mapping.lastAssignedVolumeId.remove(taskAttempt.getId()));
       }
-      int qSize = taskRequestQueue.size();
-      if (qSize != 0 && qSize % 1000 == 0) {
-        LOG.info("Size of event-queue in DefaultTaskScheduler is " + qSize);
-      }
-      int remCapacity = taskRequestQueue.remainingCapacity();
-      if (remCapacity < 1000) {
-        LOG.warn("Very low remaining capacity in the event-queue "
-            + "of DefaultTaskScheduler: " + remCapacity);
-      }
-
-      taskRequestQueue.add(event);
-    }
-
-    public void getTaskRequests(final Collection<TaskRequestEvent> 
taskRequests,
-                                int num) {
-      taskRequestQueue.drainTo(taskRequests, num);
-    }
-
-    public int size() {
-      return taskRequestQueue.size();
     }
   }
-
   /**
    * One worker can have multiple running task runners. 
<code>HostVolumeMapping</code>
    * describes various information for one worker, including :
@@ -342,8 +361,7 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
     private Map<Integer, LinkedHashSet<TaskAttempt>> 
unassignedTaskForEachVolume =
         Collections.synchronizedMap(new HashMap<Integer, 
LinkedHashSet<TaskAttempt>>());
     /** A value is last assigned volume id for each task runner */
-    private HashMap<TajoContainerId, Integer> lastAssignedVolumeId = new 
HashMap<TajoContainerId,
-      Integer>();
+    private HashMap<TaskAttemptId, Integer> lastAssignedVolumeId = 
Maps.newHashMap();
     /**
      * A key is disk volume id, and a value is the load of this volume.
      * This load is measured by counting how many number of tasks are running.
@@ -383,24 +401,18 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
      *  2. unknown block or Non-splittable task in host
      *  3. remote tasks. unassignedTaskForEachVolume is only contained local 
task. so it will be null
      */
-    public synchronized TaskAttemptId getLocalTask(TajoContainerId 
containerId) {
-      int volumeId;
+    public synchronized TaskAttemptId getLocalTask() {
+      int volumeId = getLowestVolumeId();
       TaskAttemptId taskAttemptId = null;
 
-      if (!lastAssignedVolumeId.containsKey(containerId)) {
-        volumeId = getLowestVolumeId();
-        increaseConcurrency(containerId, volumeId);
-      } else {
-        volumeId = lastAssignedVolumeId.get(containerId);
-      }
-
+      increaseConcurrency(volumeId);
       if (unassignedTaskForEachVolume.size() >  0) {
         int retry = unassignedTaskForEachVolume.size();
         do {
           //clean and get a remaining local task
           taskAttemptId = getAndRemove(volumeId);
           if(!unassignedTaskForEachVolume.containsKey(volumeId)) {
-            decreaseConcurrency(containerId);
+            decreaseConcurrency(volumeId);
             if (volumeId > REMOTE) {
               diskVolumeLoads.remove(volumeId);
             }
@@ -409,7 +421,7 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
           if (taskAttemptId == null) {
             //reassign next volume
             volumeId = getLowestVolumeId();
-            increaseConcurrency(containerId, volumeId);
+            increaseConcurrency(volumeId);
             retry--;
           } else {
             break;
@@ -418,6 +430,8 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
       } else {
         this.remainTasksNum.set(0);
       }
+
+      lastAssignedVolumeId.put(taskAttemptId, volumeId);
       return taskAttemptId;
     }
 
@@ -490,11 +504,10 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
     /**
      * Increase the count of running tasks and disk loads for a certain task 
runner.
      *
-     * @param containerId The task runner identifier
      * @param volumeId Volume identifier
      * @return the volume load (i.e., how many running tasks use this volume)
      */
-    private synchronized int increaseConcurrency(TajoContainerId containerId, 
int volumeId) {
+    private synchronized int increaseConcurrency(int volumeId) {
 
       int concurrency = 1;
       if (diskVolumeLoads.containsKey(volumeId)) {
@@ -512,16 +525,14 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
             + ", Remote Concurrency : " + concurrency);
       }
       diskVolumeLoads.put(volumeId, concurrency);
-      lastAssignedVolumeId.put(containerId, volumeId);
       return concurrency;
     }
 
     /**
      * Decrease the count of running tasks of a certain task runner
      */
-    private synchronized void decreaseConcurrency(TajoContainerId containerId){
-      Integer volumeId = lastAssignedVolumeId.get(containerId);
-      if(volumeId != null && diskVolumeLoads.containsKey(volumeId)){
+    private synchronized void decreaseConcurrency(int volumeId){
+      if(diskVolumeLoads.containsKey(volumeId)){
         Integer concurrency = diskVolumeLoads.get(volumeId);
         if(concurrency > 0){
           diskVolumeLoads.put(volumeId, concurrency - 1);
@@ -531,7 +542,6 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
           }
         }
       }
-      lastAssignedVolumeId.remove(containerId);
     }
 
     /**
@@ -557,19 +567,6 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
       }
     }
 
-    public boolean isAssigned(TajoContainerId containerId){
-      return lastAssignedVolumeId.containsKey(containerId);
-    }
-
-    public boolean isRemote(TajoContainerId containerId){
-      Integer volumeId = lastAssignedVolumeId.get(containerId);
-      if(volumeId == null || volumeId > REMOTE){
-        return false;
-      } else {
-        return true;
-      }
-    }
-
     public int getRemoteConcurrency(){
       return getVolumeConcurrency(REMOTE);
     }
@@ -585,7 +582,6 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
     }
 
     public String getHost() {
-
       return host;
     }
 
@@ -594,6 +590,25 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
     }
   }
 
+  public void cancel(TaskAttempt taskAttempt) {
+
+    if(taskAttempt.isLeafTask()) {
+      List<DataLocation> locations = taskAttempt.getTask().getDataLocations();
+
+      for (DataLocation location : locations) {
+        HostVolumeMapping volumeMapping = 
scheduledRequests.leafTaskHostMapping.get(location.getHost());
+        volumeMapping.addTaskAttempt(location.getVolumeId(), taskAttempt);
+      }
+
+      scheduledRequests.leafTasks.add(taskAttempt.getId());
+    } else {
+      scheduledRequests.nonLeafTasks.add(taskAttempt.getId());
+    }
+
+    context.getMasterContext().getEventHandler().handle(
+        new TaskAttemptEvent(taskAttempt.getId(), 
TaskAttemptEventType.TA_ASSIGN_CANCEL));
+  }
+
   private class ScheduledRequests {
     // two list leafTasks and nonLeafTasks keep all tasks to be scheduled. 
Even though some task is included in
     // leafTaskHostMapping or leafTasksRackMapping, some task T will not be 
sent to a task runner
@@ -603,12 +618,20 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
     private Map<String, HostVolumeMapping> leafTaskHostMapping = 
Maps.newConcurrentMap();
     private final Map<String, HashSet<TaskAttemptId>> leafTasksRackMapping = 
Maps.newConcurrentMap();
 
-    private synchronized void addLeafTask(TaskAttemptToSchedulerEvent event) {
+    protected void clear() {
+      leafTasks.clear();
+      nonLeafTasks.clear();
+      leafTaskHostMapping.clear();
+      leafTasksRackMapping.clear();
+    }
+
+    private void addLeafTask(TaskAttemptToSchedulerEvent event) {
       TaskAttempt taskAttempt = event.getTaskAttempt();
       List<DataLocation> locations = taskAttempt.getTask().getDataLocations();
 
       for (DataLocation location : locations) {
         String host = location.getHost();
+        leafTaskHosts.add(host);
 
         HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
         if (hostVolumeMapping == null) {
@@ -650,14 +673,12 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
       return nonLeafTasks.size();
     }
 
-    public Set<TaskAttemptId> assignedRequest = new HashSet<TaskAttemptId>();
-
-    private TaskAttemptId allocateLocalTask(String host, TajoContainerId 
containerId){
+    private TaskAttemptId allocateLocalTask(String host){
       HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
 
       if (hostVolumeMapping != null) { //tajo host is located in hadoop 
datanode
         for (int i = 0; i < hostVolumeMapping.getRemainingLocalTaskSize(); 
i++) {
-          TaskAttemptId attemptId = 
hostVolumeMapping.getLocalTask(containerId);
+          TaskAttemptId attemptId = hostVolumeMapping.getLocalTask();
 
           if(attemptId == null) break;
           //find remaining local task
@@ -736,8 +757,8 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
         rackLocalAssigned++;
         totalAssigned++;
 
-        LOG.info(String.format("Assigned Local/Rack/Total: (%d/%d/%d), 
Locality: %.2f%%, Rack host: %s",
-            hostLocalAssigned, rackLocalAssigned, totalAssigned,
+        LOG.info(String.format("Assigned Local/Rack/Cancel/Total: 
(%d/%d/%d/%d), Locality: %.2f%%, Rack host: %s",
+            hostLocalAssigned, rackLocalAssigned, cancellation, totalAssigned,
             ((double) hostLocalAssigned / (double) totalAssigned) * 100, 
host));
 
       }
@@ -747,6 +768,8 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
     public void assignToLeafTasks(LinkedList<TaskRequestEvent> taskRequests) {
       Collections.shuffle(taskRequests);
       LinkedList<TaskRequestEvent> remoteTaskRequests = new 
LinkedList<TaskRequestEvent>();
+      String queryMasterHostAndPort = 
context.getMasterContext().getQueryMasterContext().getWorkerContext().
+          getConnectionInfo().getHostAndQMPort();
 
       TaskRequestEvent taskRequest;
       while (leafTasks.size() > 0 && (!taskRequests.isEmpty() || 
!remoteTaskRequests.isEmpty())) {
@@ -757,58 +780,61 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
 
         // checking if this container is still alive.
         // If not, ignore the task request and stop the task runner
-        ContainerProxy container = 
context.getMasterContext().getResourceAllocator()
-            .getContainer(taskRequest.getContainerId());
-        if(container == null) {
-          taskRequest.getCallback().run(stopTaskRunnerReq);
-          continue;
-        }
+        WorkerConnectionInfo connectionInfo = 
context.getMasterContext().getWorkerMap().get(taskRequest.getWorkerId());
+        if(connectionInfo == null) continue;
 
         // getting the hostname of requested node
-        WorkerConnectionInfo connectionInfo =
-            
context.getMasterContext().getResourceAllocator().getWorkerConnectionInfo(taskRequest.getWorkerId());
         String host = connectionInfo.getHost();
 
         // if there are no worker matched to the hostname a task request
-        if(!leafTaskHostMapping.containsKey(host)){
+        if (!leafTaskHostMapping.containsKey(host) && !taskRequests.isEmpty()) 
{
           String normalizedHost = NetUtils.normalizeHost(host);
 
-          if(!leafTaskHostMapping.containsKey(normalizedHost) && 
!taskRequests.isEmpty()){
+          if (!leafTaskHostMapping.containsKey(normalizedHost)) {
             // this case means one of either cases:
             // * there are no blocks which reside in this node.
             // * all blocks which reside in this node are consumed, and this 
task runner requests a remote task.
             // In this case, we transfer the task request to the remote task 
request list, and skip the followings.
             remoteTaskRequests.add(taskRequest);
             continue;
+          } else {
+            host = normalizedHost;
           }
         }
 
-        TajoContainerId containerId = taskRequest.getContainerId();
-        LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + 
"," +
-            "containerId=" + containerId);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() 
+ "," +
+              "worker=" + connectionInfo.getHostAndPeerRpcPort());
+        }
 
         //////////////////////////////////////////////////////////////////////
         // disk or host-local allocation
         //////////////////////////////////////////////////////////////////////
-        TaskAttemptId attemptId = allocateLocalTask(host, containerId);
+        TaskAttemptId attemptId = allocateLocalTask(host);
 
         if (attemptId == null) { // if a local task cannot be found
           HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
 
-          if(hostVolumeMapping != null) {
-            if(!hostVolumeMapping.isRemote(containerId)){
-              // assign to remote volume
-              hostVolumeMapping.decreaseConcurrency(containerId);
-              hostVolumeMapping.increaseConcurrency(containerId, 
HostVolumeMapping.REMOTE);
-            }
-            // this part is remote concurrency management of a tail tasks
-            int tailLimit = Math.max(remainingScheduledObjectNum() / 
(leafTaskHostMapping.size() * 2), 1);
+          if(!taskRequests.isEmpty()) { //if other requests remains, move to 
remote list for better locality
+            remoteTaskRequests.add(taskRequest);
+            candidateWorkers.remove(connectionInfo.getId());
+            continue;
 
-            if(hostVolumeMapping.getRemoteConcurrency() > tailLimit){
-              //release container
-              hostVolumeMapping.decreaseConcurrency(containerId);
-              taskRequest.getCallback().run(stopTaskRunnerReq);
-              continue;
+          } else {
+            if(hostVolumeMapping != null) {
+              int nodes = context.getMasterContext().getWorkerMap().size();
+              //this part is to control the assignment of tail and remote task 
balancing per node
+              int tailLimit = 1;
+              if (remainingScheduledObjectNum() > 0) {
+                tailLimit = Math.max(remainingScheduledObjectNum() / nodes, 1);
+              }
+
+              if (hostVolumeMapping.getRemoteConcurrency() >= tailLimit) { 
//remote task throttling per node
+                continue;
+              } else {
+                // assign to remote volume
+                
hostVolumeMapping.increaseConcurrency(HostVolumeMapping.REMOTE);
+              }
             }
           }
 
@@ -816,6 +842,9 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
           // rack-local allocation
           
//////////////////////////////////////////////////////////////////////
           attemptId = allocateRackTask(host);
+          if (attemptId != null && hostVolumeMapping != null) {
+            hostVolumeMapping.lastAssignedVolumeId.put(attemptId, 
HostVolumeMapping.REMOTE);
+          }
 
           
//////////////////////////////////////////////////////////////////////
           // random node allocation
@@ -826,8 +855,8 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
               leafTasks.remove(attemptId);
               rackLocalAssigned++;
               totalAssigned++;
-              LOG.info(String.format("Assigned Local/Remote/Total: (%d/%d/%d), 
Locality: %.2f%%,",
-                  hostLocalAssigned, rackLocalAssigned, totalAssigned,
+              LOG.info(String.format("Assigned Local/Remote/Cancel/Total: 
(%d/%d/%d/%d), Locality: %.2f%%,",
+                  hostLocalAssigned, rackLocalAssigned, cancellation, 
totalAssigned,
                   ((double) hostLocalAssigned / (double) totalAssigned) * 
100));
             }
           }
@@ -842,17 +871,51 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
               false,
               LogicalNodeSerializer.serialize(task.getLogicalPlan()),
               context.getMasterContext().getQueryContext(),
-              stage.getDataChannel(), stage.getBlock().getEnforcer());
+              stage.getDataChannel(), stage.getBlock().getEnforcer(),
+              queryMasterHostAndPort);
+
           if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) {
             taskAssign.setInterQuery();
           }
 
-          context.getMasterContext().getEventHandler().handle(new 
TaskAttemptAssignedEvent(attemptId,
-              taskRequest.getContainerId(), connectionInfo));
-          assignedRequest.add(attemptId);
+          //TODO send batch request
+          BatchAllocationRequest.Builder requestProto = 
BatchAllocationRequest.newBuilder();
+          requestProto.addTaskRequest(TaskAllocationProto.newBuilder()
+              .setResource(taskRequest.getResponseProto().getResource())
+              .setTaskRequest(taskAssign.getProto()).build());
+
+          
requestProto.setExecutionBlockId(attemptId.getTaskId().getExecutionBlockId().getProto());
+          context.getMasterContext().getEventHandler().handle(new 
TaskAttemptAssignedEvent(attemptId, connectionInfo));
 
+          InetSocketAddress addr = 
stage.getAssignedWorkerMap().get(connectionInfo.getId());
+          if (addr == null) addr = new 
InetSocketAddress(connectionInfo.getHost(), connectionInfo.getPeerRpcPort());
+
+          AsyncRpcClient tajoWorkerRpc = null;
+          CallFuture<BatchAllocationResponse> callFuture = new 
CallFuture<BatchAllocationResponse>();
+          try {
+            tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, 
TajoWorkerProtocol.class, true);
+            TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = 
tajoWorkerRpc.getStub();
+            tajoWorkerRpcClient.allocateTasks(callFuture.getController(), 
requestProto.build(), callFuture);
+
+            BatchAllocationResponse responseProto =
+                callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+
+            if (responseProto.getCancellationTaskCount() > 0) {
+              for (TaskAllocationProto proto : 
responseProto.getCancellationTaskList()) {
+                cancel(task.getAttempt(new 
TaskAttemptId(proto.getTaskRequest().getId())));
+                cancellation++;
+              }
+
+              if(LOG.isDebugEnabled()) {
+                LOG.debug("Canceled requests: " + 
responseProto.getCancellationTaskCount() + " from " +  addr);
+              }
+              continue;
+            }
+          } catch (Exception e) {
+            LOG.error(e);
+          }
           scheduledObjectNum--;
-          taskRequest.getCallback().run(taskAssign.getProto());
+
         } else {
           throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!");
         }
@@ -874,6 +937,8 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
 
     public void assignToNonLeafTasks(LinkedList<TaskRequestEvent> 
taskRequests) {
       Collections.shuffle(taskRequests);
+      String queryMasterHostAndPort = 
context.getMasterContext().getQueryMasterContext().getWorkerContext().
+          getConnectionInfo().getHostAndQMPort();
 
       TaskRequestEvent taskRequest;
       while (!taskRequests.isEmpty()) {
@@ -891,6 +956,7 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
 
           Task task;
           task = stage.getTask(attemptId.getTaskId());
+
           TaskRequest taskAssign = new TaskRequestImpl(
               attemptId,
               Lists.newArrayList(task.getAllFragments()),
@@ -899,7 +965,9 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
               LogicalNodeSerializer.serialize(task.getLogicalPlan()),
               context.getMasterContext().getQueryContext(),
               stage.getDataChannel(),
-              stage.getBlock().getEnforcer());
+              stage.getBlock().getEnforcer(),
+              queryMasterHostAndPort);
+
           if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) {
             taskAssign.setInterQuery();
           }
@@ -912,13 +980,49 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
             }
           }
 
-          WorkerConnectionInfo connectionInfo = 
context.getMasterContext().getResourceAllocator().
-              getWorkerConnectionInfo(taskRequest.getWorkerId());
-          context.getMasterContext().getEventHandler().handle(new 
TaskAttemptAssignedEvent(attemptId,
-              taskRequest.getContainerId(), connectionInfo));
-          taskRequest.getCallback().run(taskAssign.getProto());
-          totalAssigned++;
-          scheduledObjectNum--;
+          WorkerConnectionInfo connectionInfo =
+              
context.getMasterContext().getWorkerMap().get(taskRequest.getWorkerId());
+
+          //TODO send batch request
+          BatchAllocationRequest.Builder requestProto = 
BatchAllocationRequest.newBuilder();
+          requestProto.addTaskRequest(TaskAllocationProto.newBuilder()
+              .setResource(taskRequest.getResponseProto().getResource())
+              .setTaskRequest(taskAssign.getProto()).build());
+
+          
requestProto.setExecutionBlockId(attemptId.getTaskId().getExecutionBlockId().getProto());
+          context.getMasterContext().getEventHandler().handle(new 
TaskAttemptAssignedEvent(attemptId, connectionInfo));
+
+          CallFuture<BatchAllocationResponse> callFuture = new 
CallFuture<BatchAllocationResponse>();
+
+          InetSocketAddress addr = 
stage.getAssignedWorkerMap().get(connectionInfo.getId());
+          if (addr == null) addr = new 
InetSocketAddress(connectionInfo.getHost(), connectionInfo.getPeerRpcPort());
+
+          AsyncRpcClient tajoWorkerRpc;
+          try {
+            tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, 
TajoWorkerProtocol.class, true);
+            TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = 
tajoWorkerRpc.getStub();
+            tajoWorkerRpcClient.allocateTasks(callFuture.getController(), 
requestProto.build(), callFuture);
+
+            BatchAllocationResponse
+                responseProto = 
callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+            if(responseProto.getCancellationTaskCount() > 0) {
+              for (TaskAllocationProto proto : 
responseProto.getCancellationTaskList()) {
+                cancel(task.getAttempt(new 
TaskAttemptId(proto.getTaskRequest().getId())));
+                cancellation++;
+              }
+
+              if(LOG.isDebugEnabled()) {
+                LOG.debug("Canceled requests: " + 
responseProto.getCancellationTaskCount() + " from " +  addr);
+              }
+              continue;
+            }
+
+            totalAssigned++;
+            scheduledObjectNum--;
+          } catch (Exception e) {
+            LOG.error(e);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index 9d5838d..6fc4ea9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@ -174,7 +174,16 @@ public class Query implements EventHandler<QueryEvent> {
               QueryEventType.KILL,
               QUERY_COMPLETED_TRANSITION)
 
-          // Transitions from FAILED state
+              // Transitions from KILLED state
+              // ignore-able transitions
+          .addTransition(QueryState.QUERY_KILLED, QueryState.QUERY_KILLED,
+              EnumSet.of(QueryEventType.START, QueryEventType.QUERY_COMPLETED,
+                  QueryEventType.KILL, QueryEventType.INTERNAL_ERROR))
+          .addTransition(QueryState.QUERY_KILLED, QueryState.QUERY_ERROR,
+              QueryEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+
+              // Transitions from FAILED state
           .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
               QueryEventType.DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
@@ -305,7 +314,6 @@ public class Query implements EventHandler<QueryEvent> {
     queryHistory.setQueryId(getId().toString());
     
queryHistory.setQueryMaster(context.getQueryMasterContext().getWorkerContext().getWorkerName());
     
queryHistory.setHttpPort(context.getQueryMasterContext().getWorkerContext().getConnectionInfo().getHttpInfoPort());
-    queryHistory.setLogicalPlan(plan.toString());
     queryHistory.setLogicalPlan(plan.getLogicalPlan().toString());
     queryHistory.setDistributedPlan(plan.toString());
 
@@ -704,8 +712,17 @@ public class Query implements EventHandler<QueryEvent> {
             !executeNextBlock(query, castEvent.getExecutionBlockId())) {
           return;
         }
-         // if a query is completed due to finished, kill, failure, or error
-        query.eventHandler.handle(new 
QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
+
+        //wait for stages is completed
+        if (query.completedStagesCount >= query.stages.size()) {
+          // if a query is completed due to finished, kill, failure, or error
+          query.eventHandler.handle(new 
QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
+        }
+        LOG.info(String.format("Complete Stage[%s], State: %s, %d/%d. ",
+            castEvent.getExecutionBlockId().toString(),
+            castEvent.getState().toString(),
+            query.completedStagesCount,
+            query.stages.size()));
       } catch (Throwable t) {
         LOG.error(t.getMessage(), t);
         query.eventHandler.handle(new QueryEvent(event.getQueryId(), 
QueryEventType.INTERNAL_ERROR));

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index 6c5bd22..e07b43f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -18,8 +18,8 @@
 
 package org.apache.tajo.querymaster;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.commons.collections.map.LRUMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -29,19 +29,24 @@ import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tajo.*;
+import org.apache.tajo.QueryId;
+
+import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol;
-import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
+import 
org.apache.tajo.ipc.QueryCoordinatorProtocol.QueryCoordinatorProtocolService;
+import org.apache.tajo.ResourceProtos.TajoHeartbeatRequest;
+import org.apache.tajo.ResourceProtos.TajoHeartbeatResponse;
+import org.apache.tajo.ResourceProtos.WorkerConnectionsResponse;
 import org.apache.tajo.master.event.QueryStartEvent;
 import org.apache.tajo.master.event.QueryStopEvent;
 import org.apache.tajo.rpc.*;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.service.ServiceTracker;
-import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.util.history.HistoryReader;
 import org.apache.tajo.util.history.QueryHistory;
 import org.apache.tajo.worker.TajoWorker;
 
@@ -52,7 +57,6 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 public class QueryMaster extends CompositeService implements EventHandler {
   private static final Log LOG = 
LogFactory.getLog(QueryMaster.class.getName());
@@ -69,11 +73,12 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
 
   private Map<QueryId, QueryMasterTask> queryMasterTasks = 
Maps.newConcurrentMap();
 
-  private Map<QueryId, QueryMasterTask> finishedQueryMasterTasks = 
Maps.newConcurrentMap();
+  private final LRUMap
+      finishedQueryMasterTasksCache = new 
LRUMap(HistoryReader.DEFAULT_PAGE_SIZE);
 
   private ClientSessionTimeoutCheckThread clientSessionTimeoutCheckThread;
 
-  private AtomicBoolean queryMasterStop = new AtomicBoolean(false);
+  private volatile boolean isStopped;
 
   private QueryMasterContext queryMasterContext;
 
@@ -89,44 +94,37 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
 
   private ExecutorService eventExecutor;
 
+  private ExecutorService singleEventExecutor;
+
   public QueryMaster(TajoWorker.WorkerContext workerContext) {
     super(QueryMaster.class.getName());
     this.workerContext = workerContext;
   }
 
-  public void init(Configuration conf) {
-    LOG.info("QueryMaster init");
-    if (!(conf instanceof TajoConf)) {
-      throw new IllegalArgumentException("conf should be a TajoConf type");
-    }
-    try {
-      this.systemConf = (TajoConf)conf;
-      this.manager = RpcClientManager.getInstance();
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
 
-      querySessionTimeout = 
systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
-      queryMasterContext = new QueryMasterContext(systemConf);
+    this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
+    this.manager = RpcClientManager.getInstance();
 
-      clock = new SystemClock();
+    querySessionTimeout = 
systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
+    queryMasterContext = new QueryMasterContext(systemConf);
 
-      this.dispatcher = new AsyncDispatcher();
-      addIfService(dispatcher);
+    clock = new SystemClock();
 
-      globalPlanner = new GlobalPlanner(systemConf, workerContext);
+    this.dispatcher = new AsyncDispatcher();
+    addIfService(dispatcher);
 
-      dispatcher.register(QueryStartEvent.EventType.class, new 
QueryStartEventHandler());
-      dispatcher.register(QueryStopEvent.EventType.class, new 
QueryStopEventHandler());
+    globalPlanner = new GlobalPlanner(systemConf, workerContext);
 
-    } catch (Throwable t) {
-      LOG.error(t.getMessage(), t);
-      throw new RuntimeException(t);
-    }
-    super.init(conf);
+    dispatcher.register(QueryStartEvent.EventType.class, new 
QueryStartEventHandler());
+    dispatcher.register(QueryStopEvent.EventType.class, new 
QueryStopEventHandler());
+    super.serviceInit(conf);
+    LOG.info("QueryMaster inited");
   }
 
   @Override
-  public void start() {
-    LOG.info("QueryMaster start");
-
+  public void serviceStart() throws Exception {
     queryHeartbeatThread = new QueryHeartbeatThread();
     queryHeartbeatThread.start();
 
@@ -136,15 +134,15 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
     finishedQueryMasterTaskCleanThread = new 
FinishedQueryMasterTaskCleanThread();
     finishedQueryMasterTaskCleanThread.start();
 
-    eventExecutor = Executors.newSingleThreadExecutor();
-    super.start();
+    eventExecutor = 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+    singleEventExecutor = Executors.newSingleThreadExecutor();
+    super.serviceStart();
+    LOG.info("QueryMaster started");
   }
 
   @Override
-  public void stop() {
-    if(queryMasterStop.getAndSet(true)){
-      return;
-    }
+  public void serviceStop() throws Exception {
+    isStopped = true;
 
     if(queryHeartbeatThread != null) {
       queryHeartbeatThread.interrupt();
@@ -162,60 +160,15 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
       eventExecutor.shutdown();
     }
 
-    super.stop();
-
-    LOG.info("QueryMaster stopped");
-  }
-
-  protected void 
cleanupExecutionBlock(List<TajoIdProtos.ExecutionBlockIdProto> 
executionBlockIds) {
-    StringBuilder cleanupMessage = new StringBuilder();
-    String prefix = "";
-    for (TajoIdProtos.ExecutionBlockIdProto eachEbId: executionBlockIds) {
-      cleanupMessage.append(prefix).append(new 
ExecutionBlockId(eachEbId).toString());
-      prefix = ",";
+    if(singleEventExecutor != null){
+      singleEventExecutor.shutdown();
     }
-    LOG.info("cleanup executionBlocks: " + cleanupMessage);
-    NettyClientBase rpc = null;
-    List<WorkerResourceProto> workers = getAllWorker();
-    TajoWorkerProtocol.ExecutionBlockListProto.Builder builder = 
TajoWorkerProtocol.ExecutionBlockListProto.newBuilder();
-    builder.addAllExecutionBlockId(Lists.newArrayList(executionBlockIds));
-    TajoWorkerProtocol.ExecutionBlockListProto executionBlockListProto = 
builder.build();
 
-    for (WorkerResourceProto worker : workers) {
-      try {
-        TajoProtos.WorkerConnectionInfoProto connectionInfo = 
worker.getConnectionInfo();
-        rpc = 
manager.getClient(NetUtils.createSocketAddr(connectionInfo.getHost(), 
connectionInfo.getPeerRpcPort()),
-            TajoWorkerProtocol.class, true);
-        TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService 
= rpc.getStub();
-
-        tajoWorkerProtocolService.cleanupExecutionBlocks(null, 
executionBlockListProto, NullCallback.get());
-      } catch (Exception e) {
-        LOG.warn("Ignoring exception. " + e.getMessage(), e);
-        continue;
-      }
-    }
-  }
-
-  private void cleanup(QueryId queryId) {
-    LOG.info("cleanup query resources : " + queryId);
-    NettyClientBase rpc = null;
-    List<WorkerResourceProto> workers = getAllWorker();
-
-    for (WorkerResourceProto worker : workers) {
-      try {
-        TajoProtos.WorkerConnectionInfoProto connectionInfo = 
worker.getConnectionInfo();
-        rpc = 
manager.getClient(NetUtils.createSocketAddr(connectionInfo.getHost(), 
connectionInfo.getPeerRpcPort()),
-            TajoWorkerProtocol.class, true);
-        TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService 
= rpc.getStub();
-
-        tajoWorkerProtocolService.cleanup(null, queryId.getProto(), 
NullCallback.get());
-      } catch (Exception e) {
-        LOG.error(e.getMessage(), e);
-      }
-    }
+    super.serviceStop();
+    LOG.info("QueryMaster stopped");
   }
 
-  public List<WorkerResourceProto> getAllWorker() {
+  public List<TajoProtos.WorkerConnectionInfoProto> getAllWorker() {
 
     NettyClientBase rpc = null;
     try {
@@ -228,16 +181,17 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
       rpc = manager.getClient(serviceTracker.getUmbilicalAddress(), 
QueryCoordinatorProtocol.class, true);
       QueryCoordinatorProtocolService masterService = rpc.getStub();
 
-      CallFuture<WorkerResourcesRequest> callBack = new 
CallFuture<WorkerResourcesRequest>();
-      masterService.getAllWorkerResource(callBack.getController(),
+      CallFuture<WorkerConnectionsResponse> callBack = new 
CallFuture<WorkerConnectionsResponse>();
+      masterService.getAllWorkers(callBack.getController(),
           PrimitiveProtos.NullProto.getDefaultInstance(), callBack);
 
-      WorkerResourcesRequest workerResourcesRequest = callBack.get(2, 
TimeUnit.SECONDS);
-      return workerResourcesRequest.getWorkerResourcesList();
+      WorkerConnectionsResponse connectionsProto =
+          callBack.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+      return connectionsProto.getWorkerList();
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
     }
-    return new ArrayList<WorkerResourceProto>();
+    return new ArrayList<TajoProtos.WorkerConnectionInfoProto>();
   }
 
   @Override
@@ -253,13 +207,14 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
     return queryMasterTasks.get(queryId);
   }
 
+  @Deprecated
   public QueryMasterTask getQueryMasterTask(QueryId queryId, boolean 
includeFinished) {
     QueryMasterTask queryMasterTask =  queryMasterTasks.get(queryId);
     if(queryMasterTask != null) {
       return queryMasterTask;
     } else {
       if(includeFinished) {
-        return finishedQueryMasterTasks.get(queryId);
+        return (QueryMasterTask) finishedQueryMasterTasksCache.get(queryId);
       } else {
         return null;
       }
@@ -274,8 +229,9 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
     return queryMasterTasks.values();
   }
 
+  @Deprecated
   public Collection<QueryMasterTask> getFinishedQueryMasterTasks() {
-    return finishedQueryMasterTasks.values();
+    return finishedQueryMasterTasksCache.values();
   }
 
   public class QueryMasterContext {
@@ -293,6 +249,10 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
       return eventExecutor;
     }
 
+    public ExecutorService getSingleEventExecutor(){
+      return singleEventExecutor;
+    }
+
     public AsyncDispatcher getDispatcher() {
       return dispatcher;
     }
@@ -324,12 +284,12 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
         return;
       }
 
-      finishedQueryMasterTasks.put(queryId, queryMasterTask);
+      finishedQueryMasterTasksCache.put(queryId, queryMasterTask);
 
-      TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(queryMasterTask);
+      TajoHeartbeatRequest queryHeartbeat = 
buildTajoHeartBeat(queryMasterTask);
       CallFuture<TajoHeartbeatResponse> future = new 
CallFuture<TajoHeartbeatResponse>();
 
-      NettyClientBase tmClient = null;
+      NettyClientBase tmClient;
       try {
         tmClient = 
manager.getClient(workerContext.getServiceTracker().getUmbilicalAddress(),
             QueryCoordinatorProtocol.class, true);
@@ -346,9 +306,6 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
 
       try {
         queryMasterTask.stop();
-        if (!queryContext.getBool(SessionVars.DEBUG_ENABLED)) {
-          cleanup(queryId);
-        }
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
       }
@@ -367,8 +324,8 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
     }
   }
 
-  private TajoHeartbeat buildTajoHeartBeat(QueryMasterTask queryMasterTask) {
-    TajoHeartbeat.Builder builder = TajoHeartbeat.newBuilder();
+  private TajoHeartbeatRequest buildTajoHeartBeat(QueryMasterTask 
queryMasterTask) {
+    TajoHeartbeatRequest.Builder builder = TajoHeartbeatRequest.newBuilder();
 
     builder.setConnectionInfo(workerContext.getConnectionInfo().getProto());
     builder.setQueryId(queryMasterTask.getQueryId().getProto());
@@ -387,7 +344,7 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
     public void handle(QueryStartEvent event) {
       LOG.info("Start QueryStartEventHandler:" + event.getQueryId());
       QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext,
-          event.getQueryId(), event.getSession(), event.getQueryContext(), 
event.getJsonExpr());
+          event.getQueryId(), event.getSession(), event.getQueryContext(), 
event.getJsonExpr(), event.getAllocation());
 
       synchronized(queryMasterTasks) {
         queryMasterTasks.put(event.getQueryId(), queryMasterTask);
@@ -402,7 +359,6 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
 
       if (queryMasterTask.isInitError()) {
         queryMasterContext.stopQuery(queryMasterTask.getQueryId());
-        return;
       }
     }
   }
@@ -422,31 +378,29 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
     @Override
     public void run() {
       LOG.info("Start QueryMaster heartbeat thread");
-      while(!queryMasterStop.get()) {
+      while(!isStopped) {
         List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>();
-        synchronized(queryMasterTasks) {
-          tempTasks.addAll(queryMasterTasks.values());
-        }
-        synchronized(queryMasterTasks) {
-          for(QueryMasterTask eachTask: tempTasks) {
-            NettyClientBase tmClient;
-            try {
+        tempTasks.addAll(queryMasterTasks.values());
+
+        for(QueryMasterTask eachTask: tempTasks) {
+          NettyClientBase tmClient;
+          try {
 
-              ServiceTracker serviceTracker = 
queryMasterContext.getWorkerContext().getServiceTracker();
-              tmClient = 
manager.getClient(serviceTracker.getUmbilicalAddress(),
-                  QueryCoordinatorProtocol.class, true);
-              QueryCoordinatorProtocolService masterClientService = 
tmClient.getStub();
+            ServiceTracker serviceTracker = 
queryMasterContext.getWorkerContext().getServiceTracker();
+            tmClient = manager.getClient(serviceTracker.getUmbilicalAddress(),
+                QueryCoordinatorProtocol.class, true);
+            QueryCoordinatorProtocolService masterClientService = 
tmClient.getStub();
 
-              TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask);
-              masterClientService.heartbeat(null, queryHeartbeat, 
NullCallback.get());
-            } catch (Throwable t) {
-              t.printStackTrace();
-            }
+            TajoHeartbeatRequest queryHeartbeat = buildTajoHeartBeat(eachTask);
+            masterClientService.heartbeat(null, queryHeartbeat, 
NullCallback.get());
+          } catch (Throwable t) {
+            t.printStackTrace();
           }
         }
-        synchronized(queryMasterStop) {
+
+        synchronized(this) {
           try {
-            queryMasterStop.wait(2000);
+            this.wait(2000);
           } catch (InterruptedException e) {
             break;
           }
@@ -459,16 +413,16 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
   class ClientSessionTimeoutCheckThread extends Thread {
     public void run() {
       LOG.info("ClientSessionTimeoutCheckThread started");
-      while(!queryMasterStop.get()) {
+      while(!isStopped) {
         try {
-          Thread.sleep(1000);
+          synchronized (this) {
+            this.wait(1000);
+          }
         } catch (InterruptedException e) {
           break;
         }
         List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>();
-        synchronized(queryMasterTasks) {
-          tempTasks.addAll(queryMasterTasks.values());
-        }
+        tempTasks.addAll(queryMasterTasks.values());
 
         for(QueryMasterTask eachTask: tempTasks) {
           if(!eachTask.isStopped()) {
@@ -490,11 +444,13 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
 
   class FinishedQueryMasterTaskCleanThread extends Thread {
     public void run() {
-      int expireIntervalTime = 
systemConf.getIntVar(TajoConf.ConfVars.QUERYMASTER_HISTORY_EXPIRE_PERIOD);
+      int expireIntervalTime = 
systemConf.getIntVar(TajoConf.ConfVars.QUERYMASTER_CACHE_EXPIRE_PERIOD);
       LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval 
minutes = " + expireIntervalTime);
-      while(!queryMasterStop.get()) {
+      while(!isStopped) {
         try {
-          Thread.sleep(60 * 1000);  // minimum interval minutes
+          synchronized (this) {
+            this.wait(60 * 1000);  // minimum interval minutes
+          }
         } catch (InterruptedException e) {
           break;
         }
@@ -508,26 +464,25 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
     }
 
     private void cleanExpiredFinishedQueryMasterTask(long expireTime) {
-      synchronized(finishedQueryMasterTasks) {
-        List<QueryId> expiredQueryIds = new ArrayList<QueryId>();
-        for(Map.Entry<QueryId, QueryMasterTask> entry: 
finishedQueryMasterTasks.entrySet()) {
-
+      List<QueryId> expiredQueryIds = new ArrayList<QueryId>();
+      for(Object key: new 
ArrayList<Object>(finishedQueryMasterTasksCache.keySet())) {
+        QueryId queryId = (QueryId) key;
           /* If a query are abnormal termination, the finished time will be 
zero. */
-          long finishedTime = entry.getValue().getStartTime();
-          Query query = entry.getValue().getQuery();
-          if (query != null && query.getFinishTime() > 0) {
-            finishedTime = query.getFinishTime();
-          }
-
-          if(finishedTime < expireTime) {
-            expiredQueryIds.add(entry.getKey());
-          }
+        QueryMasterTask queryMasterTask = (QueryMasterTask) 
finishedQueryMasterTasksCache.get(queryId);
+        long finishedTime = queryMasterTask.getStartTime();
+        Query query = queryMasterTask.getQuery();
+        if (query != null && query.getFinishTime() > 0) {
+          finishedTime = query.getFinishTime();
         }
 
-        for(QueryId eachId: expiredQueryIds) {
-          finishedQueryMasterTasks.remove(eachId);
+        if(finishedTime < expireTime) {
+          expiredQueryIds.add(queryId);
         }
       }
+
+      for(QueryId eachId: expiredQueryIds) {
+        finishedQueryMasterTasksCache.remove(eachId);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
index 59933a7..62216aa 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.querymaster;
 
-import com.google.common.base.Preconditions;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import org.apache.commons.logging.Log;
@@ -27,19 +26,26 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.tajo.*;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.master.event.*;
-import org.apache.tajo.session.Session;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.resource.NodeResource;
 import org.apache.tajo.rpc.AsyncRpcServer;
+import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.session.Session;
 import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TajoWorker;
+import org.apache.tajo.worker.event.QMResourceAllocateEvent;
 
 import java.net.InetSocketAddress;
 
+import static org.apache.tajo.ResourceProtos.*;
+
 public class QueryMasterManagerService extends CompositeService
     implements QueryMasterProtocol.QueryMasterProtocolService.Interface {
   private static final Log LOG = 
LogFactory.getLog(QueryMasterManagerService.class.getName());
@@ -64,51 +70,41 @@ public class QueryMasterManagerService extends 
CompositeService
   }
 
   @Override
-  public void init(Configuration conf) {
-    Preconditions.checkArgument(conf instanceof TajoConf);
-    TajoConf tajoConf = (TajoConf) conf;
-    try {
-      // Setup RPC server
-      InetSocketAddress initIsa =
-          new InetSocketAddress("0.0.0.0", port);
-      if (initIsa.getAddress() == null) {
-        throw new IllegalArgumentException("Failed resolve of " + initIsa);
-      }
-
-      int workerNum = 
tajoConf.getIntVar(TajoConf.ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM);
-      this.rpcServer = new AsyncRpcServer(QueryMasterProtocol.class, this, 
initIsa, workerNum);
-      this.rpcServer.start();
+  public void serviceInit(Configuration conf) throws Exception {
+
+    TajoConf tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
+    // Setup RPC server
+    InetSocketAddress initIsa =
+        new InetSocketAddress("0.0.0.0", port);
+    if (initIsa.getAddress() == null) {
+      throw new IllegalArgumentException("Failed resolve of " + initIsa);
+    }
 
-      this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
-      this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
+    int workerNum = 
tajoConf.getIntVar(TajoConf.ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM);
+    this.rpcServer = new AsyncRpcServer(QueryMasterProtocol.class, this, 
initIsa, workerNum);
+    this.rpcServer.start();
 
-      this.port = bindAddr.getPort();
+    this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
+    this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
 
-      queryMaster = new QueryMaster(workerContext);
-      addService(queryMaster);
+    this.port = bindAddr.getPort();
 
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    }
+    queryMaster = new QueryMaster(workerContext);
+    addService(queryMaster);
     // Get the master address
     LOG.info("QueryMasterManagerService is bind to " + addr);
-    ((TajoConf)conf).setVar(TajoConf.ConfVars.WORKER_QM_RPC_ADDRESS, addr);
+    tajoConf.setVar(TajoConf.ConfVars.WORKER_QM_RPC_ADDRESS, addr);
 
-    super.init(conf);
+    super.serviceInit(conf);
   }
 
   @Override
-  public void start() {
-    super.start();
-  }
-
-  @Override
-  public void stop() {
+  public void serviceStop() throws Exception {
     if(rpcServer != null) {
       rpcServer.shutdown();
     }
     LOG.info("QueryMasterManagerService stopped");
-    super.stop();
+    super.serviceStop();
   }
 
   public InetSocketAddress getBindAddr() {
@@ -116,52 +112,31 @@ public class QueryMasterManagerService extends 
CompositeService
   }
 
   @Override
-  public void getTask(RpcController controller, 
TajoWorkerProtocol.GetTaskRequestProto request,
-                      RpcCallback<TajoWorkerProtocol.TaskRequestProto> done) {
-    try {
-      ExecutionBlockId ebId = new 
ExecutionBlockId(request.getExecutionBlockId());
-      QueryMasterTask queryMasterTask = 
workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId());
-
-      if(queryMasterTask == null || queryMasterTask.isStopped()) {
-        done.run(DefaultTaskScheduler.stopTaskRunnerReq);
-      } else {
-        TajoContainerId cid =
-            
queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
-        LOG.debug("getTask:" + cid + ", ebId:" + ebId);
-        queryMasterTask.handleTaskRequestEvent(new 
TaskRequestEvent(request.getWorkerId(), cid, ebId, done));
-      }
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      controller.setFailed(e.getMessage());
-    }
-  }
-
-  @Override
-  public void statusUpdate(RpcController controller, 
TajoWorkerProtocol.TaskStatusProto request,
+  public void statusUpdate(RpcController controller, TaskStatusProto request,
                            RpcCallback<PrimitiveProtos.NullProto> done) {
     QueryId queryId = new 
QueryId(request.getId().getTaskId().getExecutionBlockId().getQueryId());
     TaskAttemptId attemptId = new TaskAttemptId(request.getId());
     QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
-    if (queryMasterTask == null) {
-      queryMasterTask = queryMaster.getQueryMasterTask(queryId, true);
-    }
-    Stage sq = 
queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId());
-    Task task = sq.getTask(attemptId.getTaskId());
-    TaskAttempt attempt = task.getAttempt(attemptId.getId());
 
-    if(LOG.isDebugEnabled()){
-      LOG.debug(String.format("Task State: %s, Attempt State: %s", 
task.getState().name(), attempt.getState().name()));
-    }
+    if (queryMasterTask != null) {
+      Stage sq = 
queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId());
+      Task task = sq.getTask(attemptId.getTaskId());
+      TaskAttempt attempt = task.getAttempt(attemptId.getId());
 
-    if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
-      LOG.warn(attemptId + " Killed");
-      attempt.handle(
-          new TaskAttemptEvent(new TaskAttemptId(request.getId()), 
TaskAttemptEventType.TA_LOCAL_KILLED));
-    } else {
-      queryMasterTask.getEventHandler().handle(
-          new TaskAttemptStatusUpdateEvent(new TaskAttemptId(request.getId()), 
request));
-    }
+      if(LOG.isDebugEnabled()){
+        LOG.debug(String.format("Task State: %s, Attempt State: %s", 
task.getState().name(), attempt.getState().name()));
+      }
+
+      if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
+        LOG.warn(attemptId + " Killed");
+        attempt.handle(
+            new TaskAttemptEvent(new TaskAttemptId(request.getId()), 
TaskAttemptEventType.TA_LOCAL_KILLED));
+      } else {
+        queryMasterTask.getEventHandler().handle(
+            new TaskAttemptStatusUpdateEvent(new 
TaskAttemptId(request.getId()), request));
+      }
 
+    }
     done.run(TajoWorker.NULL_PROTO);
   }
 
@@ -173,7 +148,7 @@ public class QueryMasterManagerService extends 
CompositeService
   }
 
   @Override
-  public void fatalError(RpcController controller, 
TajoWorkerProtocol.TaskFatalErrorReport report,
+  public void fatalError(RpcController controller, TaskFatalErrorReport report,
                          RpcCallback<PrimitiveProtos.NullProto> done) {
     QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
         new 
QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
@@ -186,7 +161,7 @@ public class QueryMasterManagerService extends 
CompositeService
   }
 
   @Override
-  public void done(RpcController controller, 
TajoWorkerProtocol.TaskCompletionReport report,
+  public void done(RpcController controller, TaskCompletionReport report,
                    RpcCallback<PrimitiveProtos.NullProto> done) {
     QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
         new 
QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
@@ -198,7 +173,7 @@ public class QueryMasterManagerService extends 
CompositeService
 
   @Override
   public void doneExecutionBlock(
-      RpcController controller, TajoWorkerProtocol.ExecutionBlockReport 
request,
+      RpcController controller, ExecutionBlockReport request,
       RpcCallback<PrimitiveProtos.NullProto> done) {
     QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(new 
QueryId(request.getEbId().getQueryId()));
     if (queryMasterTask != null) {
@@ -209,6 +184,38 @@ public class QueryMasterManagerService extends 
CompositeService
   }
 
   @Override
+  public void getExecutionBlockContext(RpcController controller,
+                                       ExecutionBlockContextRequest request,
+                                       
RpcCallback<ExecutionBlockContextResponse> done) {
+
+    QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+        new QueryId(request.getExecutionBlockId().getQueryId()));
+    if (queryMasterTask != null) {
+
+      Stage stage = queryMasterTask.getQuery().getStage(new 
ExecutionBlockId(request.getExecutionBlockId()));
+
+      // first request with starting ExecutionBlock
+      PlanProto.ShuffleType shuffleType = 
stage.getDataChannel().getShuffleType();
+
+      ExecutionBlockContextResponse.Builder ebRequestProto = 
ExecutionBlockContextResponse.newBuilder();
+      ebRequestProto.setExecutionBlockId(request.getExecutionBlockId())
+          .setQueryContext(stage.getContext().getQueryContext().getProto())
+          .setQueryOutputPath(stage.getContext().getStagingDir().toString())
+          .setPlanJson(CoreGsonHelper.toJson(stage.getBlock().getPlan(), 
LogicalNode.class))
+          .setShuffleType(shuffleType);
+
+      //Set assigned worker to stage
+      if 
(!stage.getAssignedWorkerMap().containsKey(request.getWorker().getId())) {
+        stage.getAssignedWorkerMap().put(request.getWorker().getId(),
+            NetUtils.createSocketAddr(request.getWorker().getHost(), 
request.getWorker().getPeerRpcPort()));
+      }
+      done.run(ebRequestProto.build());
+    } else {
+      controller.setFailed("Can't find query. request: " + request);
+    }
+  }
+
+  @Override
   public void killQuery(RpcController controller, TajoIdProtos.QueryIdProto 
request,
                         RpcCallback<PrimitiveProtos.NullProto> done) {
     QueryId queryId = new QueryId(request);
@@ -221,7 +228,7 @@ public class QueryMasterManagerService extends 
CompositeService
 
   @Override
   public void executeQuery(RpcController controller,
-                           TajoWorkerProtocol.QueryExecutionRequestProto 
request,
+                           QueryExecutionRequest request,
                            RpcCallback<PrimitiveProtos.NullProto> done) {
     workerContext.getWorkerSystemMetrics().counter("querymaster", 
"numQuery").inc();
 
@@ -231,7 +238,22 @@ public class QueryMasterManagerService extends 
CompositeService
         new Session(request.getSession()),
         new QueryContext(workerContext.getQueryMaster().getContext().getConf(),
             request.getQueryContext()), request.getExprInJson().getValue(),
-        request.getLogicalPlanJson().getValue()));
+        request.getLogicalPlanJson().getValue(), new 
NodeResource(request.getAllocation().getResource())));
     done.run(TajoWorker.NULL_PROTO);
   }
+
+  @Override
+  public void allocateQueryMaster(RpcController controller,
+                               AllocationResourceProto request,
+                               RpcCallback<PrimitiveProtos.BoolProto> done) {
+    CallFuture<PrimitiveProtos.BoolProto> callFuture = new 
CallFuture<PrimitiveProtos.BoolProto>();
+    workerContext.getNodeResourceManager().handle(new 
QMResourceAllocateEvent(request, callFuture));
+
+    try {
+      done.run(callFuture.get());
+    } catch (Exception e) {
+      controller.setFailed(e.getMessage());
+      done.run(TajoWorker.FALSE_PROTO);
+    }
+  }
 }

Reply via email to