http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index 1f5e7a3..61e2dea 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@ -18,6 +18,8 @@
 
 package org.apache.tajo.querymaster;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -35,11 +37,9 @@ import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.exception.UnimplementedException;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.TajoContainerProxy;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.plan.LogicalOptimizer;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
@@ -48,20 +48,29 @@ import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.rpc.*;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.session.Session;
-import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.FormatProperty;
+import org.apache.tajo.storage.Tablespace;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.metrics.TajoMetrics;
 import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
-import org.apache.tajo.worker.AbstractResourceAllocator;
-import org.apache.tajo.worker.TajoResourceAllocator;
+import org.apache.tajo.worker.event.NodeResourceDeallocateEvent;
+import org.apache.tajo.worker.event.NodeResourceEvent;
+import org.apache.tajo.worker.event.NodeStatusEvent;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.tajo.TajoProtos.QueryState;
+import static org.apache.tajo.ResourceProtos.*;
 
 public class QueryMasterTask extends CompositeService {
   private static final Log LOG = 
LogFactory.getLog(QueryMasterTask.class.getName());
@@ -90,20 +99,21 @@ public class QueryMasterTask extends CompositeService {
 
   private AtomicLong lastClientHeartbeat = new AtomicLong(-1);
 
-  private AbstractResourceAllocator resourceAllocator;
-
-  private AtomicBoolean stopped = new AtomicBoolean(false);
+  private volatile boolean isStopped;
 
   private TajoMetrics queryMetrics;
 
   private Throwable initError;
 
-  private final List<TajoWorkerProtocol.TaskFatalErrorReport> diagnostics =
-      new ArrayList<TajoWorkerProtocol.TaskFatalErrorReport>();
+  private NodeResource allocation;
+
+  private final List<TaskFatalErrorReport> diagnostics = new 
ArrayList<TaskFatalErrorReport>();
+
+  private final ConcurrentMap<Integer, WorkerConnectionInfo> workerMap = 
Maps.newConcurrentMap();
 
   public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
                          QueryId queryId, Session session, QueryContext 
queryContext,
-                         String jsonExpr, AsyncDispatcher dispatcher) {
+                         String jsonExpr, NodeResource allocation, 
AsyncDispatcher dispatcher) {
 
     super(QueryMasterTask.class.getName());
     this.queryMasterContext = queryMasterContext;
@@ -111,90 +121,90 @@ public class QueryMasterTask extends CompositeService {
     this.session = session;
     this.queryContext = queryContext;
     this.jsonExpr = jsonExpr;
+    this.allocation = allocation;
     this.querySubmitTime = System.currentTimeMillis();
     this.dispatcher = dispatcher;
   }
 
   public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
                          QueryId queryId, Session session, QueryContext 
queryContext,
-                         String jsonExpr) {
-    this(queryMasterContext, queryId, session, queryContext, jsonExpr, new 
AsyncDispatcher());
+                         String jsonExpr,
+                         NodeResource allocation) {
+    this(queryMasterContext, queryId, session, queryContext, jsonExpr, 
allocation, new AsyncDispatcher());
   }
 
   @Override
-  public void init(Configuration conf) {
-    if (!(conf instanceof TajoConf)) {
-      throw new IllegalArgumentException("conf should be a TajoConf type.");
-    }
-    systemConf = (TajoConf)conf;
+  public void serviceInit(Configuration conf) throws Exception {
 
-    try {
-      queryTaskContext = new QueryMasterTaskContext();
-      String resourceManagerClassName = 
systemConf.getVar(TajoConf.ConfVars.RESOURCE_MANAGER_CLASS);
+    systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
 
-      
if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) 
>= 0) {
-        resourceAllocator = new TajoResourceAllocator(queryTaskContext);
-      } else {
-        throw new UnimplementedException(resourceManagerClassName + " is not 
supported yet");
-      }
-      addService(resourceAllocator);
-      addService(dispatcher);
+    queryTaskContext = new QueryMasterTaskContext();
 
-      dispatcher.register(StageEventType.class, new StageEventDispatcher());
-      dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
-      dispatcher.register(TaskAttemptEventType.class, new 
TaskAttemptEventDispatcher());
-      dispatcher.register(QueryMasterQueryCompletedEvent.EventType.class, new 
QueryFinishEventHandler());
-      dispatcher.register(TaskSchedulerEvent.EventType.class, new 
TaskSchedulerDispatcher());
-      dispatcher.register(LocalTaskEventType.class, new 
LocalTaskEventHandler());
+    addService(dispatcher);
 
-      queryMetrics = new TajoMetrics(queryId.toString());
+    dispatcher.register(StageEventType.class, new StageEventDispatcher());
+    dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
+    dispatcher.register(TaskAttemptEventType.class, new 
TaskAttemptEventDispatcher());
+    dispatcher.register(QueryMasterQueryCompletedEvent.EventType.class, new 
QueryFinishEventHandler());
+    dispatcher.register(TaskSchedulerEvent.EventType.class, new 
TaskSchedulerDispatcher());
+    dispatcher.register(LocalTaskEventType.class, new LocalTaskEventHandler());
 
-      super.init(systemConf);
+    try {
+      queryMetrics = new TajoMetrics(queryId.toString());
     } catch (Throwable t) {
       LOG.error(t.getMessage(), t);
       initError = t;
     }
+    super.serviceInit(systemConf);
   }
 
   public boolean isStopped() {
-    return stopped.get();
+    return isStopped;
   }
 
   @Override
-  public void start() {
+  public void serviceStart() throws Exception {
     startQuery();
-    super.start();
+    List<TajoProtos.WorkerConnectionInfoProto> workersProto = 
queryMasterContext.getQueryMaster().getAllWorker();
+    for (TajoProtos.WorkerConnectionInfoProto worker : workersProto) {
+      workerMap.put(worker.getId(), new WorkerConnectionInfo(worker));
+    }
+    super.serviceStart();
   }
 
   @Override
-  public void stop() {
-
-    if(stopped.getAndSet(true)) {
-      return;
-    }
+  public void serviceStop() throws Exception {
+    isStopped = true;
 
     LOG.info("Stopping QueryMasterTask:" + queryId);
 
-    try {
-      resourceAllocator.stop();
-    } catch (Throwable t) {
-      LOG.fatal(t.getMessage(), t);
+    //release QM resource
+    EventHandler handler = 
getQueryTaskContext().getQueryMasterContext().getWorkerContext().
+        getNodeResourceManager().getDispatcher().getEventHandler();
+
+    handler.handle(new NodeResourceDeallocateEvent(allocation, 
NodeResourceEvent.ResourceType.QUERY_MASTER));
+
+    //flush current node resource
+    handler.handle(new 
NodeStatusEvent(NodeStatusEvent.EventType.FLUSH_REPORTS));
+
+    if (!queryContext.getBool(SessionVars.DEBUG_ENABLED)) {
+      cleanupQuery(getQueryId());
     }
 
     if (queryMetrics != null) {
-      queryMetrics.report(new MetricsConsoleReporter());
+      queryMasterContext.getEventExecutor().submit(new Runnable() {
+        @Override
+        public void run() {
+          queryMetrics.report(new MetricsConsoleReporter());
+        }
+      });
     }
 
-    super.stop();
+    super.serviceStop();
     LOG.info("Stopped QueryMasterTask:" + queryId);
   }
 
-  public void handleTaskRequestEvent(TaskRequestEvent event) {
-    ExecutionBlockId id = event.getExecutionBlockId();
-    query.getStage(id).handleTaskRequestEvent(event);
-  }
-
-  public void handleTaskFailed(TajoWorkerProtocol.TaskFatalErrorReport report) 
{
+  public void handleTaskFailed(TaskFatalErrorReport report) {
     synchronized(diagnostics) {
       if (diagnostics.size() < 10) {
         diagnostics.add(report);
@@ -204,7 +214,7 @@ public class QueryMasterTask extends CompositeService {
     getEventHandler().handle(new TaskFatalErrorEvent(report));
   }
 
-  public Collection<TajoWorkerProtocol.TaskFatalErrorReport> getDiagnostics() {
+  public Collection<TaskFatalErrorReport> getDiagnostics() {
     synchronized(diagnostics) {
       return Collections.unmodifiableCollection(diagnostics);
     }
@@ -252,33 +262,53 @@ public class QueryMasterTask extends CompositeService {
     }
   }
 
+  /**
+   * It sends a kill RPC request to a corresponding worker.
+   *
+   * @param workerId worker unique Id.
+   * @param taskAttemptId The TaskAttemptId to be killed.
+   */
+  protected void killTaskAttempt(int workerId, TaskAttemptId taskAttemptId) {
+    NettyClientBase tajoWorkerRpc;
+    ExecutionBlockId ebId = taskAttemptId.getTaskId().getExecutionBlockId();
+    InetSocketAddress workerAddress = 
getQuery().getStage(ebId).getAssignedWorkerMap().get(workerId);
+
+    try {
+      tajoWorkerRpc = RpcClientManager.getInstance().getClient(workerAddress, 
TajoWorkerProtocol.class, true);
+      TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = 
tajoWorkerRpc.getStub();
+      CallFuture<PrimitiveProtos.BoolProto> callFuture = new 
CallFuture<PrimitiveProtos.BoolProto>();
+      tajoWorkerRpcClient.killTaskAttempt(null, taskAttemptId.getProto(), 
callFuture);
+
+      if(!callFuture.get().getValue()){
+        queryMasterContext.getEventHandler().handle(
+            new TaskFatalErrorEvent(taskAttemptId, "Can't kill task :" + 
taskAttemptId));
+      }
+    } catch (Exception e) {
+      /* Node RPC failure */
+      LOG.error(e.getMessage(), e);
+      queryMasterContext.getEventHandler().handle(new 
TaskFatalErrorEvent(taskAttemptId, e.getMessage()));
+    }
+  }
+
   private class LocalTaskEventHandler implements EventHandler<LocalTaskEvent> {
     @Override
-    public void handle(LocalTaskEvent event) {
-      TajoContainerProxy proxy = (TajoContainerProxy) 
resourceAllocator.getContainers().get(event.getContainerId());
-      if (proxy != null) {
-        proxy.killTaskAttempt(event.getTaskAttemptId());
-      }
+    public void handle(final LocalTaskEvent event) {
+      queryMasterContext.getEventExecutor().submit(new Runnable() {
+        @Override
+        public void run() {
+          killTaskAttempt(event.getWorkerId(), event.getTaskAttemptId());
+        }
+      });
     }
   }
 
   private class QueryFinishEventHandler implements 
EventHandler<QueryMasterQueryCompletedEvent> {
+
     @Override
     public void handle(QueryMasterQueryCompletedEvent event) {
-      QueryId queryId = event.getQueryId();
-      LOG.info("Query completion notified from " + queryId);
-
-      while (!isTerminatedState(query.getSynchronizedState())) {
-        try {
-          synchronized (this) {
-            wait(10);
-          }
-        } catch (InterruptedException e) {
-          LOG.error(e);
-        }
-      }
-      LOG.info("Query final state: " + query.getSynchronizedState());
 
+      QueryId queryId = event.getQueryId();
+      LOG.info("Query completion notified from " + queryId + " final state: " 
+ query.getSynchronizedState());
       queryMasterContext.getEventHandler().handle(new QueryStopEvent(queryId));
     }
   }
@@ -441,6 +471,29 @@ public class QueryMasterTask extends CompositeService {
     return this.querySubmitTime;
   }
 
+  private void cleanupQuery(final QueryId queryId) {
+    Set<InetSocketAddress> workers = Sets.newHashSet();
+    for (Stage stage : getQuery().getStages()) {
+      workers.addAll(stage.getAssignedWorkerMap().values());
+    }
+
+    LOG.info("Cleanup resources of all workers. Query: " + queryId + ", 
workers: " + workers.size());
+    for (final InetSocketAddress worker : workers) {
+      queryMasterContext.getEventExecutor().submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            AsyncRpcClient rpc = 
RpcClientManager.getInstance().getClient(worker, TajoWorkerProtocol.class, 
true);
+            TajoWorkerProtocol.TajoWorkerProtocolService 
tajoWorkerProtocolService = rpc.getStub();
+            tajoWorkerProtocolService.stopQuery(null, queryId.getProto(), 
NullCallback.get());
+          } catch (Throwable e) {
+            LOG.error(e.getMessage(), e);
+          }
+        }
+      });
+    }
+  }
+
   public class QueryMasterTaskContext {
     EventHandler eventHandler;
     public QueryMaster.QueryMasterContext getQueryMasterContext() {
@@ -501,12 +554,15 @@ public class QueryMasterTask extends CompositeService {
       return query.getProgress();
     }
 
-    public AbstractResourceAllocator getResourceAllocator() {
-      return resourceAllocator;
-    }
-
     public TajoMetrics getQueryMetrics() {
       return queryMetrics;
     }
+
+    /**
+     * A key is worker id, and a value is a worker connection information.
+     */
+    public ConcurrentMap<Integer, WorkerConnectionInfo> getWorkerMap() {
+      return workerMap;
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index f30fb64..50b84d4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -40,8 +40,8 @@ import org.apache.tajo.engine.planner.global.MasterPlan;
 import 
org.apache.tajo.engine.planner.global.rewriter.rules.GlobalPlanRewriteUtil;
 import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.exception.InternalException;
-import 
org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
-import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
+import 
org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
+import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
 import org.apache.tajo.querymaster.Task.IntermediateEntry;
 import org.apache.tajo.plan.logical.SortNode.SortPurpose;
 import org.apache.tajo.plan.util.PlannerUtil;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 1163a6e..50b45a8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -20,15 +20,12 @@ package org.apache.tajo.querymaster;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import org.apache.commons.lang.exception.ExceptionUtils;
+import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.*;
-import org.apache.hadoop.yarn.util.Records;
 import org.apache.tajo.*;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
@@ -38,27 +35,24 @@ import org.apache.tajo.catalog.statistics.ColumnStats;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import 
org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
-import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
-import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
-import org.apache.tajo.master.LaunchTaskRunnersEvent;
-import org.apache.tajo.master.TaskRunnerGroupEvent;
-import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
+import 
org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
+import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
 import org.apache.tajo.master.TaskState;
-import org.apache.tajo.master.container.TajoContainer;
-import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.master.event.*;
 import 
org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.querymaster.Task.IntermediateEntry;
+import org.apache.tajo.rpc.AsyncRpcClient;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcClientManager;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.storage.FileTablespace;
 import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.storage.Tablespace;
@@ -70,8 +64,8 @@ import org.apache.tajo.util.history.TaskHistory;
 import org.apache.tajo.worker.FetchImpl;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
@@ -80,6 +74,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.tajo.conf.TajoConf.ConfVars;
 import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
+import static org.apache.tajo.ResourceProtos.*;
 
 
 /**
@@ -107,16 +102,12 @@ public class Stage implements EventHandler<StageEvent> {
   private volatile long lastContactTime;
   private Thread timeoutChecker;
 
-  volatile Map<TaskId, Task> tasks = new ConcurrentHashMap<TaskId, Task>();
-  volatile Map<TajoContainerId, TajoContainer> containers = new 
ConcurrentHashMap<TajoContainerId,
-    TajoContainer>();
+  private final Map<TaskId, Task> tasks = Maps.newConcurrentMap();
+  private final Map<Integer, InetSocketAddress> workerMap = 
Maps.newConcurrentMap();
 
   private static final DiagnosticsUpdateTransition 
DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
   private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new 
InternalErrorTransition();
-  private static final ContainerLaunchTransition CONTAINER_LAUNCH_TRANSITION = 
new ContainerLaunchTransition();
   private static final TaskCompletedTransition TASK_COMPLETED_TRANSITION = new 
TaskCompletedTransition();
-  private static final AllocatedContainersCancelTransition 
CONTAINERS_CANCEL_TRANSITION =
-      new AllocatedContainersCancelTransition();
   private static final StageCompleteTransition STAGE_COMPLETED_TRANSITION = 
new StageCompleteTransition();
   private static final StageFinalizeTransition STAGE_FINALIZE_TRANSITION = new 
StageFinalizeTransition();
   private StateMachine<StageState, StageEventType, StageEvent> stateMachine;
@@ -142,8 +133,7 @@ public class Stage implements EventHandler<StageEvent> {
 
           // Transitions from INITED state
           .addTransition(StageState.INITED, StageState.RUNNING,
-              StageEventType.SQ_CONTAINER_ALLOCATED,
-              CONTAINER_LAUNCH_TRANSITION)
+              StageEventType.SQ_START)
           .addTransition(StageState.INITED, StageState.INITED,
               StageEventType.SQ_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
@@ -153,10 +143,7 @@ public class Stage implements EventHandler<StageEvent> {
               StageEventType.SQ_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
-          // Transitions from RUNNING state
-          .addTransition(StageState.RUNNING, StageState.RUNNING,
-              StageEventType.SQ_CONTAINER_ALLOCATED,
-              CONTAINER_LAUNCH_TRANSITION)
+              // Transitions from RUNNING state
           .addTransition(StageState.RUNNING, StageState.RUNNING,
               StageEventType.SQ_TASK_COMPLETED,
               TASK_COMPLETED_TRANSITION)
@@ -185,10 +172,8 @@ public class Stage implements EventHandler<StageEvent> {
 
           // Transitions from KILL_WAIT state
           .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
-              EnumSet.of(StageEventType.SQ_START, 
StageEventType.SQ_CONTAINER_ALLOCATED),
-              CONTAINERS_CANCEL_TRANSITION)
-          .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
-              EnumSet.of(StageEventType.SQ_KILL), new KillTasksTransition())
+              EnumSet.of(StageEventType.SQ_KILL),
+              new KillTasksTransition())
           .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
               StageEventType.SQ_TASK_COMPLETED,
               TASK_COMPLETED_TRANSITION)
@@ -205,6 +190,9 @@ public class Stage implements EventHandler<StageEvent> {
           .addTransition(StageState.KILL_WAIT, StageState.ERROR,
               StageEventType.SQ_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+              // Ignore-able events
+          .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
+              EnumSet.of(StageEventType.SQ_START))
 
               // Transitions from FINALIZING state
           .addTransition(StageState.FINALIZING, StageState.FINALIZING,
@@ -226,9 +214,6 @@ public class Stage implements EventHandler<StageEvent> {
 
               // Transitions from SUCCEEDED state
           .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED,
-              StageEventType.SQ_CONTAINER_ALLOCATED,
-              CONTAINERS_CANCEL_TRANSITION)
-          .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED,
               StageEventType.SQ_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
           .addTransition(StageState.SUCCEEDED, StageState.ERROR,
@@ -239,14 +224,10 @@ public class Stage implements EventHandler<StageEvent> {
               EnumSet.of(
                   StageEventType.SQ_START,
                   StageEventType.SQ_KILL,
-                  StageEventType.SQ_CONTAINER_ALLOCATED,
                   StageEventType.SQ_SHUFFLE_REPORT))
 
           // Transitions from KILLED state
           .addTransition(StageState.KILLED, StageState.KILLED,
-              StageEventType.SQ_CONTAINER_ALLOCATED,
-              CONTAINERS_CANCEL_TRANSITION)
-          .addTransition(StageState.KILLED, StageState.KILLED,
               StageEventType.SQ_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
           .addTransition(StageState.KILLED, StageState.ERROR,
@@ -257,16 +238,12 @@ public class Stage implements EventHandler<StageEvent> {
               EnumSet.of(
                   StageEventType.SQ_START,
                   StageEventType.SQ_KILL,
-                  StageEventType.SQ_CONTAINER_ALLOCATED,
                   StageEventType.SQ_SHUFFLE_REPORT,
                   StageEventType.SQ_STAGE_COMPLETED,
                   StageEventType.SQ_FAILED))
 
           // Transitions from FAILED state
           .addTransition(StageState.FAILED, StageState.FAILED,
-              StageEventType.SQ_CONTAINER_ALLOCATED,
-              CONTAINERS_CANCEL_TRANSITION)
-          .addTransition(StageState.FAILED, StageState.FAILED,
               StageEventType.SQ_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
           .addTransition(StageState.FAILED, StageState.ERROR,
@@ -277,14 +254,10 @@ public class Stage implements EventHandler<StageEvent> {
               EnumSet.of(
                   StageEventType.SQ_START,
                   StageEventType.SQ_KILL,
-                  StageEventType.SQ_CONTAINER_ALLOCATED,
                   StageEventType.SQ_FAILED))
 
           // Transitions from ERROR state
           .addTransition(StageState.ERROR, StageState.ERROR,
-              StageEventType.SQ_CONTAINER_ALLOCATED,
-              CONTAINERS_CANCEL_TRANSITION)
-          .addTransition(StageState.ERROR, StageState.ERROR,
               StageEventType.SQ_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
           // Ignore-able transitions
@@ -294,18 +267,19 @@ public class Stage implements EventHandler<StageEvent> {
                   StageEventType.SQ_KILL,
                   StageEventType.SQ_FAILED,
                   StageEventType.SQ_INTERNAL_ERROR,
-                  StageEventType.SQ_STAGE_COMPLETED))
+                  StageEventType.SQ_STAGE_COMPLETED,
+                  StageEventType.SQ_SHUFFLE_REPORT))
 
           .installTopology();
 
   private final Lock readLock;
   private final Lock writeLock;
 
-  private int totalScheduledObjectsCount;
-  private int completedTaskCount = 0;
-  private int succeededObjectCount = 0;
-  private int killedObjectCount = 0;
-  private int failedObjectCount = 0;
+  private volatile int totalScheduledObjectsCount;
+  private volatile int completedTaskCount = 0;
+  private volatile int succeededObjectCount = 0;
+  private volatile int killedObjectCount = 0;
+  private volatile int failedObjectCount = 0;
   private TaskSchedulerContext schedulerContext;
   private List<IntermediateEntry> hashShuffleIntermediateEntries = 
Lists.newArrayList();
   private AtomicInteger completedShuffleTasks = new AtomicInteger(0);
@@ -695,16 +669,60 @@ public class Stage implements EventHandler<StageEvent> {
   }
 
   private void stopScheduler() {
-    // If there are launched TaskRunners, send the 'shouldDie' message to all r
-    // via received task requests.
     if (taskScheduler != null) {
       taskScheduler.stop();
     }
   }
 
-  private void releaseContainers() {
-    // If there are still live TaskRunners, try to kill the containers. and 
send the shuffle report request
-    eventHandler.handle(new 
TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), 
containers.values()));
+  /**
+   * Get the launched worker address
+   */
+  protected Map<Integer, InetSocketAddress> getAssignedWorkerMap() {
+    return workerMap;
+  }
+
+  private void sendStopExecutionBlockEvent(final StopExecutionBlockRequest 
requestProto) {
+
+    for (final InetSocketAddress worker : getAssignedWorkerMap().values()) {
+      getContext().getQueryMasterContext().getEventExecutor().submit(new 
Runnable() {
+        @Override
+        public void run() {
+          try {
+            AsyncRpcClient tajoWorkerRpc =
+                RpcClientManager.getInstance().getClient(worker, 
TajoWorkerProtocol.class, true);
+            TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = 
tajoWorkerRpc.getStub();
+            tajoWorkerRpcClient.stopExecutionBlock(null,
+                requestProto, 
NullCallback.get(PrimitiveProtos.BoolProto.class));
+          } catch (Throwable e) {
+            LOG.error(e.getMessage(), e);
+          }
+        }
+      });
+    }
+  }
+
+  /**
+   * Sends stopping request to all worker
+   */
+  protected void stopExecutionBlock() {
+    // If there are still live tasks, try to kill the tasks. and send the 
shuffle report request
+
+    List<TajoIdProtos.ExecutionBlockIdProto> ebIds = Lists.newArrayList();
+    if (!getContext().getQueryContext().getBool(SessionVars.DEBUG_ENABLED)) {
+      List<ExecutionBlock> childs = getMasterPlan().getChilds(getId());
+
+      for (ExecutionBlock executionBlock : childs) {
+        ebIds.add(executionBlock.getId().getProto());
+      }
+    }
+
+    StopExecutionBlockRequest.Builder stopRequest = 
StopExecutionBlockRequest.newBuilder();
+    ExecutionBlockListProto.Builder cleanupList = 
ExecutionBlockListProto.newBuilder();
+
+    cleanupList.addAllExecutionBlockId(Lists.newArrayList(ebIds));
+    stopRequest.setCleanupList(cleanupList.build());
+    stopRequest.setExecutionBlockId(getId().getProto());
+    sendStopExecutionBlockEvent(stopRequest.build());
   }
 
   /**
@@ -769,10 +787,6 @@ public class Stage implements EventHandler<StageEvent> {
     }
   }
 
-  public void handleTaskRequestEvent(TaskRequestEvent event) {
-    taskScheduler.handleTaskRequestEvent(event);
-  }
-
   private static class InitAndRequestContainer implements 
MultipleArcTransition<Stage,
       StageEvent, StageState> {
 
@@ -794,7 +808,7 @@ public class Stage implements EventHandler<StageEvent> {
           setShuffleIfNecessary(stage, channel);
           initTaskScheduler(stage);
           // execute pre-processing asyncronously
-          stage.getContext().getQueryMasterContext().getEventExecutor()
+          stage.getContext().getQueryMasterContext().getSingleEventExecutor()
               .submit(new Runnable() {
                         @Override
                         public void run() {
@@ -809,7 +823,7 @@ public class Stage implements EventHandler<StageEvent> {
                             } else {
                               if(stage.getSynchronizedState() == 
StageState.INITED) {
                                 stage.taskScheduler.start();
-                                allocateContainers(stage);
+                                stage.eventHandler.handle(new 
StageEvent(stage.getId(), StageEventType.SQ_START));
                               } else {
                                 /* all tasks are killed before stage are 
inited */
                                 if (stage.getTotalScheduledObjectsCount() == 
stage.getCompletedTaskCount()) {
@@ -1047,33 +1061,6 @@ public class Stage implements EventHandler<StageEvent> {
       }
     }
 
-    public static void allocateContainers(Stage stage) {
-      ExecutionBlock execBlock = stage.getBlock();
-
-      //TODO consider disk slot
-      int requiredMemoryMBPerTask = 512;
-
-      int numRequest = 
stage.getContext().getResourceAllocator().calculateNumRequestContainers(
-          stage.getContext().getQueryMasterContext().getWorkerContext(),
-          stage.schedulerContext.getEstimatedTaskNum(),
-          requiredMemoryMBPerTask
-      );
-
-      final Resource resource = Records.newRecord(Resource.class);
-
-      resource.setMemory(requiredMemoryMBPerTask);
-
-      LOG.info("Request Container for " + stage.getId() + " containers=" + 
numRequest);
-
-      Priority priority = Records.newRecord(Priority.class);
-      priority.setPriority(stage.getPriority());
-      ContainerAllocationEvent event =
-          new 
ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ,
-              stage.getId(), priority, resource, numRequest,
-              stage.masterPlan.isLeaf(execBlock), 0.0f);
-      stage.eventHandler.handle(event);
-    }
-
     private static void scheduleFragmentsForLeafQuery(Stage stage) throws 
IOException {
       ExecutionBlock execBlock = stage.getBlock();
       ScanNode[] scans = execBlock.getScanNodes();
@@ -1082,8 +1069,6 @@ public class Stage implements EventHandler<StageEvent> {
       TableDesc table = 
stage.context.getTableDescMap().get(scan.getCanonicalName());
 
       Collection<Fragment> fragments;
-      TableMeta meta = table.getMeta();
-
       Tablespace tablespace = 
TablespaceManager.get(scan.getTableDesc().getUri()).get();
 
       // Depending on scanner node's type, it creates fragments. If scan is for
@@ -1157,69 +1142,6 @@ public class Stage implements EventHandler<StageEvent> {
     return unit;
   }
 
-  private static class ContainerLaunchTransition
-      implements SingleArcTransition<Stage, StageEvent> {
-
-    @Override
-    public void transition(Stage stage, StageEvent event) {
-      if (!(event instanceof StageContainerAllocationEvent)) {
-        throw new IllegalArgumentException("event should be a 
StageContainerAllocationEvent type.");
-      }
-      try {
-        StageContainerAllocationEvent allocationEvent =
-            (StageContainerAllocationEvent) event;
-        for (TajoContainer container : 
allocationEvent.getAllocatedContainer()) {
-          TajoContainerId cId = container.getId();
-          if (stage.containers.containsKey(cId)) {
-            stage.eventHandler.handle(new 
StageDiagnosticsUpdateEvent(stage.getId(),
-                "Duplicated containers are allocated: " + cId.toString()));
-            stage.eventHandler.handle(new StageEvent(stage.getId(), 
StageEventType.SQ_INTERNAL_ERROR));
-          }
-          stage.containers.put(cId, container);
-        }
-        LOG.info("Stage (" + stage.getId() + ") has " + 
stage.containers.size() + " containers!");
-        stage.eventHandler.handle(
-            new LaunchTaskRunnersEvent(stage.getId(), 
allocationEvent.getAllocatedContainer(),
-                stage.getContext().getQueryContext(),
-                CoreGsonHelper.toJson(stage.getBlock().getPlan(), 
LogicalNode.class))
-        );
-
-        stage.eventHandler.handle(new StageEvent(stage.getId(), 
StageEventType.SQ_START));
-      } catch (Throwable t) {
-        stage.eventHandler.handle(new 
StageDiagnosticsUpdateEvent(stage.getId(),
-            ExceptionUtils.getStackTrace(t)));
-        stage.eventHandler.handle(new StageEvent(stage.getId(), 
StageEventType.SQ_INTERNAL_ERROR));
-      }
-    }
-  }
-
-  /**
-   * It is used in KILL_WAIT state against Contained Allocated event.
-   * It just returns allocated containers to resource manager.
-   */
-  private static class AllocatedContainersCancelTransition implements 
SingleArcTransition<Stage, StageEvent> {
-    @Override
-    public void transition(Stage stage, StageEvent event) {
-      if (!(event instanceof StageContainerAllocationEvent)) {
-        throw new IllegalArgumentException("event should be a 
StageContainerAllocationEvent type.");
-      }
-      try {
-        StageContainerAllocationEvent allocationEvent =
-            (StageContainerAllocationEvent) event;
-        stage.eventHandler.handle(
-            new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP,
-                stage.getId(), allocationEvent.getAllocatedContainer()));
-        LOG.info(String.format("[%s] %d allocated containers are canceled",
-            stage.getId().toString(),
-            allocationEvent.getAllocatedContainer().size()));
-      } catch (Throwable t) {
-        stage.eventHandler.handle(new 
StageDiagnosticsUpdateEvent(stage.getId(),
-            ExceptionUtils.getStackTrace(t)));
-        stage.eventHandler.handle(new StageEvent(stage.getId(), 
StageEventType.SQ_INTERNAL_ERROR));
-      }
-    }
-  }
-
   private static class TaskCompletedTransition implements 
SingleArcTransition<Stage, StageEvent> {
 
     @Override
@@ -1236,6 +1158,7 @@ public class Stage implements EventHandler<StageEvent> {
         stage.eventHandler.handle(new StageEvent(stage.getId(), 
StageEventType.SQ_FAILED));
       } else {
         stage.completedTaskCount++;
+        stage.getTaskScheduler().releaseTaskAttempt(task.getLastAttempt());
 
         if (taskEvent.getState() == TaskState.SUCCEEDED) {
           stage.succeededObjectCount++;
@@ -1281,19 +1204,7 @@ public class Stage implements EventHandler<StageEvent> {
 
   private void cleanup() {
     stopScheduler();
-    releaseContainers();
-
-    if (!getContext().getQueryContext().getBool(SessionVars.DEBUG_ENABLED)) {
-      List<ExecutionBlock> childs = getMasterPlan().getChilds(getId());
-      List<TajoIdProtos.ExecutionBlockIdProto> ebIds = Lists.newArrayList();
-
-      for (ExecutionBlock executionBlock : childs) {
-        ebIds.add(executionBlock.getId().getProto());
-      }
-
-      
getContext().getQueryMasterContext().getQueryMaster().cleanupExecutionBlock(ebIds);
-    }
-
+    stopExecutionBlock();
     this.finalStageHistory = makeStageHistory();
     this.finalStageHistory.setTasks(makeTaskHistories());
   }
@@ -1309,7 +1220,7 @@ public class Stage implements EventHandler<StageEvent> {
   private void finalizeShuffleReport(StageShuffleReportEvent event, 
ShuffleType type) {
     if(!checkIfNeedFinalizing(type)) return;
 
-    TajoWorkerProtocol.ExecutionBlockReport report = event.getReport();
+    ExecutionBlockReport report = event.getReport();
 
     if (!report.getReportSuccess()) {
       stopFinalization();

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
index 1da623e..a586e4b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
@@ -34,9 +34,8 @@ import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TaskId;
 import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.ipc.TajoWorkerProtocol.FailureIntermediateProto;
-import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
 import org.apache.tajo.master.TaskState;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.event.*;
 import 
org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
 import org.apache.tajo.plan.logical.*;
@@ -57,7 +56,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput;
+import static org.apache.tajo.ResourceProtos.*;
 
 public class Task implements EventHandler<TaskEvent> {
   /** Class Logger */
@@ -86,9 +85,8 @@ public class Task implements EventHandler<TaskEvent> {
   private TaskAttemptId lastAttemptId;
 
   private TaskAttemptId successfulAttempt;
-  private String succeededHost;
-  private int succeededHostPort;
-  private int succeededPullServerPort;
+
+  private WorkerConnectionInfo succeededWorker;
 
   private int failedAttempts;
   private int finishedAttempts; // finish are total of success, failed and 
killed
@@ -174,6 +172,7 @@ public class Task implements EventHandler<TaskEvent> {
               EnumSet.of(
                   TaskEventType.T_KILL,
                   TaskEventType.T_SCHEDULE,
+                  TaskEventType.T_ATTEMPT_LAUNCHED,
                   TaskEventType.T_ATTEMPT_SUCCEEDED,
                   TaskEventType.T_ATTEMPT_FAILED))
 
@@ -252,7 +251,11 @@ public class Task implements EventHandler<TaskEvent> {
       taskHistory.setState(lastAttempt.getState().toString());
       taskHistory.setProgress(lastAttempt.getProgress());
     }
-    taskHistory.setHostAndPort(succeededHost + ":" + succeededHostPort);
+
+    if(getSucceededWorker() != null) {
+      taskHistory.setHostAndPort(succeededWorker.getHostAndPeerRpcPort());
+    }
+
     taskHistory.setRetryCount(this.getRetryCount());
     taskHistory.setLaunchTime(launchTime);
     taskHistory.setFinishTime(finishTime);
@@ -357,8 +360,8 @@ public class Task implements EventHandler<TaskEvent> {
     return dataLocations;
   }
 
-  public String getSucceededHost() {
-    return succeededHost;
+  public WorkerConnectionInfo getSucceededWorker() {
+    return succeededWorker;
   }
        
        public void addFetches(String tableId, Collection<FetchImpl> fetches) {
@@ -608,9 +611,7 @@ public class Task implements EventHandler<TaskEvent> {
       TaskAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
 
       task.successfulAttempt = attemptEvent.getTaskAttemptId();
-      task.succeededHost = attempt.getWorkerConnectionInfo().getHost();
-      task.succeededHostPort = 
attempt.getWorkerConnectionInfo().getPeerRpcPort();
-      task.succeededPullServerPort = 
attempt.getWorkerConnectionInfo().getPullServerPort();
+      task.succeededWorker = attempt.getWorkerConnectionInfo();
 
       task.finishTask();
       task.eventHandler.handle(new StageTaskEvent(event.getTaskId(), 
TaskState.SUCCEEDED));
@@ -624,11 +625,7 @@ public class Task implements EventHandler<TaskEvent> {
       if (!(event instanceof TaskTAttemptEvent)) {
         throw new IllegalArgumentException("event should be a 
TaskTAttemptEvent type.");
       }
-      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
-      TaskAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
       task.launchTime = System.currentTimeMillis();
-      task.succeededHost = attempt.getWorkerConnectionInfo().getHost();
-      task.succeededHostPort = 
attempt.getWorkerConnectionInfo().getPeerRpcPort();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
index c1b9273..6c48d3b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
@@ -23,18 +23,18 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.*;
-import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
+import org.apache.tajo.ResourceProtos.TaskCompletionReport;
+import org.apache.tajo.ResourceProtos.ShuffleFileOutput;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.event.*;
 import 
org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
 import org.apache.tajo.querymaster.Task.IntermediateEntry;
 import org.apache.tajo.querymaster.Task.PullHost;
-import org.apache.tajo.master.container.TajoContainerId;
 
 import java.util.ArrayList;
 import java.util.EnumSet;
@@ -43,8 +43,6 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput;
-
 public class TaskAttempt implements EventHandler<TaskAttemptEvent> {
 
   private static final Log LOG = LogFactory.getLog(TaskAttempt.class);
@@ -55,7 +53,6 @@ public class TaskAttempt implements 
EventHandler<TaskAttemptEvent> {
   private final Task task;
   final EventHandler eventHandler;
 
-  private TajoContainerId containerId;
   private WorkerConnectionInfo workerConnectionInfo;
   private int expire;
 
@@ -109,6 +106,8 @@ public class TaskAttempt implements 
EventHandler<TaskAttemptEvent> {
           TaskAttemptEventType.TA_DONE, new SucceededTransition())
       .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_FAILED,
           TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+      .addTransition(TaskAttemptState.TA_ASSIGNED, 
TaskAttemptState.TA_UNASSIGNED,
+          TaskAttemptEventType.TA_ASSIGN_CANCEL, new CancelTransition())
 
       // Transitions from TA_RUNNING state
       .addTransition(TaskAttemptState.TA_RUNNING,
@@ -167,6 +166,10 @@ public class TaskAttempt implements 
EventHandler<TaskAttemptEvent> {
               TaskAttemptEventType.TA_ASSIGNED,
               TaskAttemptEventType.TA_DONE),
           new TaskKilledCompleteTransition())
+
+          // Transitions from TA_FAILED state
+      .addTransition(TaskAttemptState.TA_FAILED, TaskAttemptState.TA_FAILED,
+          TaskAttemptEventType.TA_KILL)
       .installTopology();
 
   private final StateMachine<TaskAttemptState, TaskAttemptEventType, 
TaskAttemptEvent>
@@ -214,10 +217,6 @@ public class TaskAttempt implements 
EventHandler<TaskAttemptEvent> {
     return this.workerConnectionInfo;
   }
 
-  public void setContainerId(TajoContainerId containerId) {
-    this.containerId = containerId;
-  }
-
   public synchronized void setExpireTime(int expire) {
     this.expire = expire;
   }
@@ -311,7 +310,6 @@ public class TaskAttempt implements 
EventHandler<TaskAttemptEvent> {
         throw new IllegalArgumentException("event should be a 
TaskAttemptAssignedEvent type.");
       }
       TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event;
-      taskAttempt.containerId = castEvent.getContainerId();
       taskAttempt.workerConnectionInfo = castEvent.getWorkerConnectionInfo();
       taskAttempt.eventHandler.handle(
           new TaskTAttemptEvent(taskAttempt.getId(),
@@ -319,6 +317,17 @@ public class TaskAttempt implements 
EventHandler<TaskAttemptEvent> {
     }
   }
 
+  private static class CancelTransition
+      implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(TaskAttempt taskAttempt,
+                           TaskAttemptEvent event) {
+
+      taskAttempt.workerConnectionInfo = null;
+    }
+  }
+
   private static class TaskKilledCompleteTransition implements 
SingleArcTransition<TaskAttempt, TaskAttemptEvent> {
 
     @Override
@@ -396,7 +405,8 @@ public class TaskAttempt implements 
EventHandler<TaskAttemptEvent> {
 
     @Override
     public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) {
-      taskAttempt.eventHandler.handle(new LocalTaskEvent(taskAttempt.getId(), 
taskAttempt.containerId,
+      taskAttempt.eventHandler.handle(new LocalTaskEvent(taskAttempt.getId(),
+          taskAttempt.getWorkerConnectionInfo().getId(),
           LocalTaskEventType.KILL));
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java
 
b/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java
index 58b8a26..15ec845 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java
@@ -27,9 +27,7 @@ public class DefaultResourceCalculator extends 
ResourceCalculator {
 
   @Override
   public int computeAvailableContainers(NodeResource available, NodeResource 
required) {
-    return Math.min(Math.min(
-        available.getMemory() / required.getMemory(),
-        available.getDisks() / required.getDisks()),
+    return Math.min(available.getMemory() / required.getMemory(),
         available.getVirtualCores() / required.getVirtualCores());
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java 
b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java
index f51fc07..c7fe55b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java
@@ -183,6 +183,6 @@ public class NodeResource implements 
ProtoObject<TajoProtos.NodeResourceProto>,
 
   @Override
   public String toString() {
-    return "<memory:" + getMemory() + ", disks:" + getDisks() + ", vCores:" + 
getVirtualCores() + ">";
+    return "(Memory:" + getMemory() + ", Disks:" + getDisks() + ", vCores:" + 
getVirtualCores() + ")";
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/session/Session.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/session/Session.java 
b/tajo-core/src/main/java/org/apache/tajo/session/Session.java
index ea7d29a..0d067dc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/session/Session.java
+++ b/tajo-core/src/main/java/org/apache/tajo/session/Session.java
@@ -32,7 +32,7 @@ import org.apache.tajo.common.ProtoObject;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.apache.tajo.ipc.TajoWorkerProtocol.SessionProto;
+import static org.apache.tajo.ResourceProtos.SessionProto;
 
 public class Session implements SessionConstants, ProtoObject<SessionProto>, 
Cloneable {
   private static final Log LOG = LogFactory.getLog(Session.class);

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java 
b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
index 578b15a..7641320 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -19,7 +19,6 @@
 package org.apache.tajo.util;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.FunctionDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
@@ -31,8 +30,6 @@ import org.apache.tajo.querymaster.Task;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.util.history.StageHistory;
 import org.apache.tajo.util.history.TaskHistory;
-import org.apache.tajo.worker.TaskRunner;
-import org.apache.tajo.worker.TaskRunnerHistory;
 
 import java.text.DecimalFormat;
 import java.util.*;
@@ -66,28 +63,6 @@ public class JSPUtil {
     Collections.sort(tasks, new TaskHistoryComparator(sortField, 
"asc".equals(sortOrder)));
   }
 
-  public static void sortTaskRunner(List<TaskRunner> taskRunners) {
-    Collections.sort(taskRunners, new Comparator<TaskRunner>() {
-      @Override
-      public int compare(TaskRunner taskRunner, TaskRunner taskRunner2) {
-        return taskRunner.getId().compareTo(taskRunner2.getId());
-      }
-    });
-  }
-
-  public static void sortTaskRunnerHistory(List<TaskRunnerHistory> histories) {
-    Collections.sort(histories, new Comparator<TaskRunnerHistory>() {
-      @Override
-      public int compare(TaskRunnerHistory h1, TaskRunnerHistory h2) {
-        int value = 
h1.getExecutionBlockId().compareTo(h2.getExecutionBlockId());
-        if(value == 0){
-          return h1.getContainerId().compareTo(h2.getContainerId());
-        }
-        return value;
-      }
-    });
-  }
-
   public static String getElapsedTime(long startTime, long finishTime) {
     if(startTime == 0) {
       return "-";
@@ -221,8 +196,8 @@ public class JSPUtil {
         if("id".equals(sortField)) {
           return task.getId().compareTo(task2.getId());
         } else if("host".equals(sortField)) {
-          String host1 = task.getSucceededHost() == null ? "-" : 
task.getSucceededHost();
-          String host2 = task2.getSucceededHost() == null ? "-" : 
task2.getSucceededHost();
+          String host1 = task.getSucceededWorker() == null ? "-" : 
task.getSucceededWorker().getHost();
+          String host2 = task2.getSucceededWorker() == null ? "-" : 
task2.getSucceededWorker().getHost();
           return host1.compareTo(host2);
         } else if("runTime".equals(sortField)) {
           return compareLong(task.getRunningTime(), task2.getRunningTime());
@@ -235,8 +210,8 @@ public class JSPUtil {
         if("id".equals(sortField)) {
           return task2.getId().compareTo(task.getId());
         } else if("host".equals(sortField)) {
-          String host1 = task.getSucceededHost() == null ? "-" : 
task.getSucceededHost();
-          String host2 = task2.getSucceededHost() == null ? "-" : 
task2.getSucceededHost();
+          String host1 = task.getSucceededWorker() == null ? "-" : 
task.getSucceededWorker().getHost();
+          String host2 = task2.getSucceededWorker() == null ? "-" : 
task2.getSucceededWorker().getHost();
           return host2.compareTo(host1);
         } else if("runTime".equals(sortField)) {
           if(task2.getLaunchTime() == 0) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java 
b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
index f4719b2..2acd5f6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.TajoWorkerProtocol.TaskHistoryProto;
+import org.apache.tajo.ResourceProtos.TaskHistoryProto;
 import org.apache.tajo.master.QueryInfo;
 import org.apache.tajo.util.Bytes;
 
@@ -192,7 +192,11 @@ public class HistoryReader {
   }
 
   public List<TaskHistory> getTaskHistory(String queryId, String ebId) throws 
IOException {
-    Path queryHistoryFile = getQueryHistoryFilePath(queryId, 0);
+    return getTaskHistory(queryId, ebId, 0);
+  }
+
+  public List<TaskHistory> getTaskHistory(String queryId, String ebId, long 
startTime) throws IOException {
+    Path queryHistoryFile = getQueryHistoryFilePath(queryId, startTime);
     if (queryHistoryFile == null) {
       return new ArrayList<TaskHistory>();
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java 
b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
index daced3e..c51b4e6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
@@ -53,6 +53,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class HistoryWriter extends AbstractService {
   private static final Log LOG = LogFactory.getLog(HistoryWriter.class);
+  public static final String HISTORY_QUERY_REPLICATION = 
"tajo.history.query.replication";
+  public static final String HISTORY_TASK_REPLICATION = 
"tajo.history.task.replication";
+
   public static final String QUERY_LIST = "query-list";
   public static final String QUERY_DETAIL = "query-detail";
   public static final String HISTORY_FILE_POSTFIX = ".hist";
@@ -87,13 +90,15 @@ public class HistoryWriter extends AbstractService {
     if (!(conf instanceof TajoConf)) {
       throw new IllegalArgumentException("conf should be a TajoConf type.");
     }
-    tajoConf = (TajoConf)conf;
+    tajoConf = (TajoConf) conf;
     historyParentPath = tajoConf.getQueryHistoryDir(tajoConf);
     taskHistoryParentPath = tajoConf.getTaskHistoryDir(tajoConf);
     writerThread = new WriterThread();
     historyCleaner = new HistoryCleaner(tajoConf, isMaster);
-    queryReplication = (short) 
tajoConf.getIntVar(TajoConf.ConfVars.HISTORY_QUERY_REPLICATION);
-    taskReplication = (short) 
tajoConf.getIntVar(TajoConf.ConfVars.HISTORY_TASK_REPLICATION);
+    queryReplication = (short) tajoConf.getInt(HISTORY_QUERY_REPLICATION,
+        FileSystem.get(tajoConf).getDefaultReplication(historyParentPath));
+    taskReplication = (short) tajoConf.getInt(HISTORY_TASK_REPLICATION,
+        FileSystem.get(tajoConf).getDefaultReplication(taskHistoryParentPath));
     super.serviceInit(conf);
   }
 
@@ -144,6 +149,7 @@ public class HistoryWriter extends AbstractService {
       }
     };
     historyQueue.add(future);
+
     synchronized (writerThread) {
       writerThread.notifyAll();
     }
@@ -151,7 +157,7 @@ public class HistoryWriter extends AbstractService {
   }
 
   /* synchronously flush to history file */
-  public synchronized void appendAndSync(History history)
+  public void appendAndSync(History history)
       throws TimeoutException, InterruptedException, IOException {
 
     WriterFuture<WriterHolder> future = appendAndFlush(history);
@@ -227,7 +233,7 @@ public class HistoryWriter extends AbstractService {
         }
 
         try {
-          if (!histories.isEmpty()) {
+          if (!stopped.get() && !histories.isEmpty()) {
             writeHistory(histories);
           } else {
             continue;
@@ -239,25 +245,23 @@ public class HistoryWriter extends AbstractService {
         //clean up history file
 
         // closing previous writer
-        synchronized (taskWriters) {
-          Calendar cal = Calendar.getInstance();
-          cal.add(Calendar.HOUR_OF_DAY, -2);
-          String closeTargetTime = df.format(cal.getTime());
-          List<String> closingTargets = new ArrayList<String>();
-
-          for (String eachWriterTime : taskWriters.keySet()) {
-            if (eachWriterTime.compareTo(closeTargetTime) <= 0) {
-              closingTargets.add(eachWriterTime);
-            }
+        Calendar cal = Calendar.getInstance();
+        cal.add(Calendar.HOUR_OF_DAY, -2);
+        String closeTargetTime = df.format(cal.getTime());
+        List<String> closingTargets = new ArrayList<String>();
+
+        for (String eachWriterTime : taskWriters.keySet()) {
+          if (eachWriterTime.compareTo(closeTargetTime) <= 0) {
+            closingTargets.add(eachWriterTime);
           }
+        }
 
-          for (String eachWriterTime : closingTargets) {
-            WriterHolder writerHolder;
-            writerHolder = taskWriters.remove(eachWriterTime);
-            if (writerHolder != null) {
-              LOG.info("Closing task history file: " + writerHolder.path);
-              IOUtils.cleanup(LOG, writerHolder);
-            }
+        for (String eachWriterTime : closingTargets) {
+          WriterHolder writerHolder;
+          writerHolder = taskWriters.remove(eachWriterTime);
+          if (writerHolder != null) {
+            LOG.info("Closing task history file: " + writerHolder.path);
+            IOUtils.cleanup(LOG, writerHolder);
           }
         }
       }
@@ -267,24 +271,20 @@ public class HistoryWriter extends AbstractService {
     private int drainHistory(Collection<WriterFuture<WriterHolder>> buffer, 
int numElements,
                              long timeoutMillis) throws InterruptedException {
 
-      long deadline = System.currentTimeMillis() + timeoutMillis;
+      long deadline = System.nanoTime() + 
TimeUnit.MILLISECONDS.toNanos(timeoutMillis);
       int added = 0;
       while (added < numElements) {
         added += historyQueue.drainTo(buffer, numElements - added);
         if (added < numElements) { // not enough elements immediately 
available; will have to wait
-          if (deadline <= System.currentTimeMillis()) {
-            break;
-          } else {
-            synchronized (writerThread) {
-              writerThread.wait(deadline - System.currentTimeMillis());
-              if (deadline > System.currentTimeMillis()) {
-                added += historyQueue.drainTo(buffer, numElements - added);
-                break;
-              }
-            }
+          WriterFuture<WriterHolder> e = historyQueue.poll(deadline - 
System.nanoTime(), TimeUnit.NANOSECONDS);
+          if (e == null) {
+            break; // we already waited enough, and there are no more elements 
in sight
           }
+          buffer.add(e);
+          added++;
         }
       }
+
       return added;
     }
 
@@ -427,13 +427,11 @@ public class HistoryWriter extends AbstractService {
     }
 
     private void flushTaskHistories() {
-      synchronized (taskWriters) {
-        for (WriterHolder holder : taskWriters.values()) {
-          try {
-            holder.flush();
-          } catch (IOException e) {
-            LOG.warn(e, e);
-          }
+      for (WriterHolder holder : taskWriters.values()) {
+        try {
+          holder.flush();
+        } catch (IOException e) {
+          LOG.warn(e, e);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
deleted file mode 100644
index 68c57f2..0000000
--- 
a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import com.google.common.collect.Maps;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.tajo.master.ContainerProxy;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.container.TajoContainerId;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-public abstract class AbstractResourceAllocator extends CompositeService 
implements ResourceAllocator {
-  /**
-   * A key is worker id, and a value is a worker connection information.
-   */
-  protected ConcurrentMap<Integer, WorkerConnectionInfo> workerInfoMap = 
Maps.newConcurrentMap();
-
-  public WorkerConnectionInfo getWorkerConnectionInfo(int workerId) {
-    return workerInfoMap.get(workerId);
-  }
-
-  public void addWorkerConnectionInfo(WorkerConnectionInfo connectionInfo) {
-    workerInfoMap.putIfAbsent(connectionInfo.getId(), connectionInfo);
-  }
-
-  private Map<TajoContainerId, ContainerProxy> containers = 
Maps.newConcurrentMap();
-
-  public AbstractResourceAllocator() {
-    super(AbstractResourceAllocator.class.getName());
-  }
-
-  public void addContainer(TajoContainerId cId, ContainerProxy container) {
-    containers.put(cId, container);
-  }
-
-  public void removeContainer(TajoContainerId cId) {
-    containers.remove(cId);
-  }
-
-  public boolean containsContainer(TajoContainerId cId) {
-    return containers.containsKey(cId);
-  }
-
-  public ContainerProxy getContainer(TajoContainerId cId) {
-    return containers.get(cId);
-  }
-
-  public Map<TajoContainerId, ContainerProxy> getContainers() {
-    return containers;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index cbd451d..57bedd2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -34,18 +34,15 @@ import org.apache.tajo.TaskId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.rpc.*;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.Pair;
+import org.apache.tajo.worker.event.ExecutionBlockErrorEvent;
 
 import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -54,14 +51,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import static org.apache.tajo.ResourceProtos.*;
 import static 
org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.Interface;
 
 public class ExecutionBlockContext {
   /** class logger */
   private static final Log LOG = 
LogFactory.getLog(ExecutionBlockContext.class);
 
-  private TaskRunnerManager manager;
+  protected AtomicInteger runningTasksNum = new AtomicInteger();
   protected AtomicInteger completedTasksNum = new AtomicInteger();
   protected AtomicInteger succeededTasksNum = new AtomicInteger();
   protected AtomicInteger killedTasksNum = new AtomicInteger();
@@ -79,10 +76,8 @@ public class ExecutionBlockContext {
 
   private TajoQueryEngine queryEngine;
   private RpcClientManager connManager;
-  private InetSocketAddress qmMasterAddr;
-  private NettyClientBase client;
+  private AsyncRpcClient queryMasterClient;
   private QueryMasterProtocol.QueryMasterProtocolService.Interface stub;
-  private WorkerConnectionInfo queryMaster;
   private TajoConf systemConf;
   // for the doAs block
   private UserGroupInformation taskOwner;
@@ -96,17 +91,13 @@ public class ExecutionBlockContext {
   // It keeps all of the query unit attempts while a TaskRunner is running.
   private final ConcurrentMap<TaskAttemptId, Task> tasks = 
Maps.newConcurrentMap();
 
-  @Deprecated
-  private final ConcurrentMap<String, TaskRunnerHistory> histories = 
Maps.newConcurrentMap();
+  private final Map<TaskId, TaskHistory> taskHistories = 
Maps.newConcurrentMap();
 
-  private final Map<TaskId, TaskHistory> taskHistories = Maps.newTreeMap();
-
-  public ExecutionBlockContext(TajoWorker.WorkerContext workerContext,
-                               TaskRunnerManager manager, 
RunExecutionBlockRequestProto request) throws IOException {
-    this.manager = manager;
+  public ExecutionBlockContext(TajoWorker.WorkerContext workerContext, 
ExecutionBlockContextResponse request,
+                               AsyncRpcClient queryMasterClient)
+      throws IOException {
     this.executionBlockId = new 
ExecutionBlockId(request.getExecutionBlockId());
     this.connManager = RpcClientManager.getInstance();
-    this.queryMaster = new WorkerConnectionInfo(request.getQueryMaster());
     this.systemConf = workerContext.getConf();
     this.reporter = new Reporter();
     this.defaultFS = 
TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
@@ -120,6 +111,7 @@ public class ExecutionBlockContext {
     this.resource = new ExecutionBlockSharedResource();
     this.workerContext = workerContext;
     this.shuffleType = request.getShuffleType();
+    this.queryMasterClient = queryMasterClient;
   }
 
   public void init() throws Throwable {
@@ -127,8 +119,6 @@ public class ExecutionBlockContext {
     LOG.info("Tajo Root Dir: " + 
systemConf.getVar(TajoConf.ConfVars.ROOT_DIR));
     LOG.info("Worker Local Dir: " + 
systemConf.getVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR));
 
-    this.qmMasterAddr = NetUtils.createSocketAddr(queryMaster.getHost(), 
queryMaster.getQueryMasterPort());
-    LOG.info("QueryMaster Address:" + qmMasterAddr);
 
     UserGroupInformation.setConfiguration(systemConf);
     // TODO - 'load credential' should be implemented
@@ -138,7 +128,7 @@ public class ExecutionBlockContext {
 
     // initialize DFS and LocalFileSystems
     this.taskOwner = taskOwner;
-    this.stub = getRpcClient().getStub();
+    this.stub = queryMasterClient.getStub();
     this.reporter.startReporter();
     // resource intiailization
     try{
@@ -157,12 +147,8 @@ public class ExecutionBlockContext {
     return resource;
   }
 
-  private NettyClientBase getRpcClient()
-      throws NoSuchMethodException, ConnectException, ClassNotFoundException {
-    if (client != null) return client;
-
-    client = connManager.newClient(qmMasterAddr, QueryMasterProtocol.class, 
true);
-    return client;
+  private AsyncRpcClient getRpcClient() {
+    return queryMasterClient;
   }
 
   public Interface getStub() {
@@ -199,7 +185,7 @@ public class ExecutionBlockContext {
     tasks.clear();
     taskHistories.clear();
     resource.release();
-    RpcClientManager.cleanup(client);
+    RpcClientManager.cleanup(queryMasterClient);
   }
 
   public TajoConf getConf() {
@@ -256,21 +242,6 @@ public class ExecutionBlockContext {
     return tasks.get(taskAttemptId);
   }
 
-  @Deprecated
-  public void stopTaskRunner(String id){
-    manager.stopTaskRunner(id);
-  }
-
-  @Deprecated
-  public TaskRunner getTaskRunner(String taskRunnerId){
-    return manager.getTaskRunner(taskRunnerId);
-  }
-
-  @Deprecated
-  public void addTaskHistory(String taskRunnerId, TaskAttemptId quAttemptId, 
TaskHistory taskHistory) {
-    histories.get(taskRunnerId).addTaskHistory(quAttemptId, taskHistory);
-  }
-
   public void addTaskHistory(TaskId taskId, TaskHistory taskHistory) {
     taskHistories.put(taskId, taskHistory);
   }
@@ -287,12 +258,15 @@ public class ExecutionBlockContext {
         .setId(taskAttemptId.getProto())
         .setErrorMessage(message);
 
-    getStub().fatalError(null, builder.build(), NullCallback.get());
-  }
-
-  public TaskRunnerHistory createTaskRunnerHistory(TaskRunner runner){
-    histories.putIfAbsent(runner.getId(), new 
TaskRunnerHistory(runner.getContainerId(), executionBlockId));
-    return histories.get(runner.getId());
+    try {
+      //If QueryMaster does not responding, current execution block should be 
stop
+      CallFuture<PrimitiveProtos.NullProto> callFuture = new 
CallFuture<PrimitiveProtos.NullProto>();
+      getStub().fatalError(callFuture.getController(), builder.build(), 
callFuture);
+      callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+    } catch (Exception e) {
+      getWorkerContext().getTaskManager().getDispatcher().getEventHandler()
+          .handle(new 
ExecutionBlockErrorEvent(taskAttemptId.getTaskId().getExecutionBlockId(), e));
+    }
   }
 
   public TajoWorker.WorkerContext getWorkerContext(){

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
index 89c3404..07a9ba6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
@@ -22,10 +22,10 @@ import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ResourceProtos.FetchProto;
 import org.apache.tajo.common.ProtoObject;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.querymaster.Task;
 import org.apache.tajo.querymaster.Repartitioner;
+import org.apache.tajo.querymaster.Task;
 import org.apache.tajo.util.TUtil;
 
 import java.net.URI;
@@ -37,7 +37,7 @@ import static 
org.apache.tajo.plan.serder.PlanProto.ShuffleType;
 /**
  * <code>FetchImpl</code> information to indicate the locations of 
intermediate data.
  */
-public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, 
Cloneable {
+public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
   private Task.PullHost host;             // The pull server host information
   private ShuffleType type; // hash or range partition method.
   private ExecutionBlockId executionBlockId;   // The executionBlock id
@@ -57,7 +57,7 @@ public class FetchImpl implements 
ProtoObject<TajoWorkerProtocol.FetchProto>, Cl
     attemptIds = new ArrayList<Integer>();
   }
 
-  public FetchImpl(TajoWorkerProtocol.FetchProto proto) {
+  public FetchImpl(FetchProto proto) {
     this(new Task.PullHost(proto.getHost(), proto.getPort()),
         proto.getType(),
         new ExecutionBlockId(proto.getExecutionBlockId()),
@@ -112,8 +112,8 @@ public class FetchImpl implements 
ProtoObject<TajoWorkerProtocol.FetchProto>, Cl
   }
 
   @Override
-  public TajoWorkerProtocol.FetchProto getProto() {
-    TajoWorkerProtocol.FetchProto.Builder builder = 
TajoWorkerProtocol.FetchProto.newBuilder();
+  public FetchProto getProto() {
+    FetchProto.Builder builder = FetchProto.newBuilder();
 
     builder.setHost(host.getHost());
     builder.setPort(host.getPort());

Reply via email to