http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
index 5c97ba8..16d32d4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
@@ -36,6 +36,7 @@ import static 
org.apache.tajo.ipc.TajoWorkerProtocol.TaskRunnerHistoryProto;
 /**
  * The history class for TaskRunner processing.
  */
+@Deprecated
 public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> {
 
   private Service.STATE state;

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index 734a8a5..d18a262 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -36,6 +36,7 @@ import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+@Deprecated
 public class TaskRunnerManager extends CompositeService implements 
EventHandler<TaskRunnerEvent> {
   private static final Log LOG = LogFactory.getLog(TaskRunnerManager.class);
 
@@ -154,14 +155,7 @@ public class TaskRunnerManager extends CompositeService 
implements EventHandler<
 
       if(context == null){
         try {
-          context = new ExecutionBlockContext(getTajoConf(),
-              getWorkerContext(),
-              this,
-              startEvent.getQueryContext(),
-              startEvent.getPlan(),
-              startEvent.getExecutionBlockId(),
-              startEvent.getQueryMaster(),
-              startEvent.getShuffleType());
+          context = new ExecutionBlockContext(getWorkerContext(), this, 
startEvent.getRequest());
           context.init();
         } catch (Throwable e) {
           LOG.fatal(e.getMessage(), e);
@@ -170,7 +164,7 @@ public class TaskRunnerManager extends CompositeService 
implements EventHandler<
         executionBlockContextMap.put(event.getExecutionBlockId(), context);
       }
 
-      TaskRunner taskRunner = new TaskRunner(context, 
startEvent.getContainerId());
+      TaskRunner taskRunner = new TaskRunner(context, 
startEvent.getRequest().getContainerId());
       LOG.info("Start TaskRunner:" + taskRunner.getId());
       taskRunnerMap.put(taskRunner.getId(), taskRunner);
       taskRunnerHistoryMap.put(taskRunner.getId(), taskRunner.getHistory());

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java
 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java
new file mode 100644
index 0000000..85d74e2
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStartEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+
+public class ExecutionBlockStartEvent extends TaskManagerEvent {
+  private TajoWorkerProtocol.RunExecutionBlockRequestProto requestProto;
+
+  public 
ExecutionBlockStartEvent(TajoWorkerProtocol.RunExecutionBlockRequestProto 
requestProto) {
+    super(EventType.EB_START, new 
ExecutionBlockId(requestProto.getExecutionBlockId()));
+    this.requestProto = requestProto;
+  }
+
+  public TajoWorkerProtocol.RunExecutionBlockRequestProto getRequestProto() {
+    return requestProto;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java
 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java
new file mode 100644
index 0000000..2b967ab
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/ExecutionBlockStopEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+
+public class ExecutionBlockStopEvent extends TaskManagerEvent {
+  private TajoWorkerProtocol.ExecutionBlockListProto cleanupList;
+
+  public ExecutionBlockStopEvent(TajoIdProtos.ExecutionBlockIdProto 
executionBlockId,
+                                 TajoWorkerProtocol.ExecutionBlockListProto 
cleanupList) {
+    super(EventType.EB_STOP, new ExecutionBlockId(executionBlockId));
+    this.cleanupList = cleanupList;
+  }
+
+  public TajoWorkerProtocol.ExecutionBlockListProto getCleanupList() {
+    return cleanupList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
index 2f411e8..9a1c106 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java
@@ -24,7 +24,7 @@ import com.google.protobuf.RpcCallback;
 import static 
org.apache.tajo.ipc.TajoWorkerProtocol.BatchAllocationRequestProto;
 import static 
org.apache.tajo.ipc.TajoWorkerProtocol.BatchAllocationResponseProto;
 
-public class NodeResourceAllocateEvent extends NodeResourceManagerEvent {
+public class NodeResourceAllocateEvent extends NodeResourceEvent {
 
   private BatchAllocationRequestProto request;
   private RpcCallback<BatchAllocationResponseProto> callback;

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
index a298d77..31d9229 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java
@@ -21,7 +21,7 @@ package org.apache.tajo.worker.event;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.resource.NodeResource;
 
-public class NodeResourceDeallocateEvent extends NodeResourceManagerEvent {
+public class NodeResourceDeallocateEvent extends NodeResourceEvent {
 
   private NodeResource resource;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java
new file mode 100644
index 0000000..6fd2e0d
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class NodeResourceEvent extends 
AbstractEvent<NodeResourceEvent.EventType> {
+  //consumer: NodeResourceManager
+  public enum EventType {
+    // producer: TajoWorkerManagerService
+    ALLOCATE,
+    // producer: TaskExecutor
+    DEALLOCATE
+  }
+
+  public NodeResourceEvent(EventType eventType) {
+    super(eventType);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java
 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java
deleted file mode 100644
index bcb3448..0000000
--- 
a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker.event;
-
-import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.resource.NodeResource;
-
-public class NodeResourceManagerEvent extends 
AbstractEvent<NodeResourceManagerEvent.EventType> {
-  public enum EventType {
-    ALLOCATE,
-    DEALLOCATE
-  }
-
-  public NodeResourceManagerEvent(EventType eventType) {
-    super(eventType);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java
index 58ab74a..9eb8ae9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java
@@ -22,19 +22,16 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.tajo.resource.NodeResource;
 
 public class NodeStatusEvent extends AbstractEvent<NodeStatusEvent.EventType> {
-  private final NodeResource resource;
 
+  // consumer: NodeStatusUpdater
   public enum EventType {
+    // producer: NodeResourceManager
     REPORT_RESOURCE,
+    // producer: TaskManager
     FLUSH_REPORTS
   }
 
-  public NodeStatusEvent(EventType eventType, NodeResource resource) {
+  public NodeStatusEvent(EventType eventType) {
     super(eventType);
-    this.resource = resource;
-  }
-
-  public NodeResource getResource() {
-    return resource;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java
new file mode 100644
index 0000000..c609c67
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskExecutorEvent.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.TaskAttemptId;
+
+public class TaskExecutorEvent extends 
AbstractEvent<TaskExecutorEvent.EventType> {
+
+  // producer: NodeResourceManager, consumer: TaskExecutorEvent
+  public enum EventType {
+    START,
+    KILL,
+    ABORT
+  }
+
+  private TaskAttemptId taskAttemptId;
+
+  public TaskExecutorEvent(EventType eventType,
+                           TaskAttemptId taskAttemptId) {
+    super(eventType);
+    this.taskAttemptId = taskAttemptId;
+  }
+
+  public TaskAttemptId getTaskId() {
+    return taskAttemptId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java
new file mode 100644
index 0000000..39b541b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskManagerEvent.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.TaskAttemptId;
+
+public class TaskManagerEvent extends 
AbstractEvent<TaskManagerEvent.EventType> {
+  // producer: NodeResourceManager, consumer: TaskManager
+  public enum EventType {
+    EB_START,
+    EB_STOP
+  }
+
+  private ExecutionBlockId executionBlockId;
+
+  public TaskManagerEvent(EventType eventType,
+                          ExecutionBlockId executionBlockId) {
+    super(eventType);
+    this.executionBlockId = executionBlockId;
+  }
+
+  public ExecutionBlockId getExecutionBlockId() {
+    return executionBlockId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
index aac8973..7175251 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
@@ -21,6 +21,7 @@ package org.apache.tajo.worker.event;
 import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.tajo.ExecutionBlockId;
 
+@Deprecated
 public class TaskRunnerEvent extends AbstractEvent<TaskRunnerEvent.EventType> {
   public enum EventType {
     START,

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
index 908afa2..9406794 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
@@ -20,48 +20,20 @@ package org.apache.tajo.worker.event;
 
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.plan.serder.PlanProto;
-
+@Deprecated
 public class TaskRunnerStartEvent extends TaskRunnerEvent {
 
-  private final QueryContext queryContext;
-  private final WorkerConnectionInfo queryMaster;
-  private final String containerId;
-  private final String plan;
-  private final PlanProto.ShuffleType shuffleType;
-
-  public TaskRunnerStartEvent(WorkerConnectionInfo queryMaster,
-                              ExecutionBlockId executionBlockId,
-                              String containerId,
-                              QueryContext context,
-                              String plan,
-                              PlanProto.ShuffleType shuffleType) {
-    super(EventType.START, executionBlockId);
-    this.queryMaster = queryMaster;
-    this.containerId = containerId;
-    this.queryContext = context;
-    this.plan = plan;
-    this.shuffleType = shuffleType;
-  }
-
-  public WorkerConnectionInfo getQueryMaster() {
-    return queryMaster;
-  }
-
-  public String getContainerId() {
-    return containerId;
-  }
-
-  public QueryContext getQueryContext() {
-    return queryContext;
-  }
+  private final TajoWorkerProtocol.RunExecutionBlockRequestProto request;
 
-  public String getPlan() {
-    return plan;
+  public TaskRunnerStartEvent(TajoWorkerProtocol.RunExecutionBlockRequestProto 
request) {
+    super(EventType.START, new 
ExecutionBlockId(request.getExecutionBlockId()));
+    this.request = request;
   }
 
-  public PlanProto.ShuffleType getShuffleType() {
-    return shuffleType;
+  public TajoWorkerProtocol.RunExecutionBlockRequestProto getRequest() {
+    return request;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
index c8ec20d..297f30c 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
@@ -20,6 +20,7 @@ package org.apache.tajo.worker.event;
 
 import org.apache.tajo.ExecutionBlockId;
 
+@Deprecated
 public class TaskRunnerStopEvent extends TaskRunnerEvent {
 
   public TaskRunnerStopEvent(ExecutionBlockId executionBlockId) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java
new file mode 100644
index 0000000..f60e7c4
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskStartEvent.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.resource.NodeResource;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProto;
+
+public class TaskStartEvent extends TaskExecutorEvent {
+
+  private NodeResource allocatedResource;
+  private TaskRequestProto taskRequest;
+
+  public TaskStartEvent(TaskRequestProto taskRequest,
+                        NodeResource allocatedResource) {
+    super(EventType.START, new TaskAttemptId(taskRequest.getId()));
+    this.taskRequest = taskRequest;
+    this.allocatedResource = allocatedResource;
+  }
+
+  public NodeResource getAllocatedResource() {
+    return allocatedResource;
+  }
+
+  public TaskRequestProto getTaskRequest() {
+    return taskRequest;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto 
b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index 2324596..715b1e6 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -208,6 +208,7 @@ message TaskAllocationRequestProto {
 message BatchAllocationRequestProto {
     required ExecutionBlockIdProto executionBlockId = 1;
     repeated TaskAllocationRequestProto taskRequest = 2;
+    optional RunExecutionBlockRequestProto executionBlockRequest = 3;  //TODO 
should be refactored
 }
 
 message BatchAllocationResponseProto {

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java 
b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index eca7f6d..0cec3da 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.querymaster;
 
 import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.tajo.*;
@@ -33,16 +34,25 @@ import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.engine.query.TaskRequestImpl;
-import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.plan.LogicalOptimizer;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
 import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.session.Session;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.history.HistoryReader;
+import org.apache.tajo.util.history.HistoryWriter;
+import org.apache.tajo.util.metrics.TajoSystemMetrics;
 import org.apache.tajo.worker.ExecutionBlockContext;
-import org.apache.tajo.worker.Task;
+import org.apache.tajo.worker.LegacyTaskImpl;
+import org.apache.tajo.worker.TajoWorker;
+import org.apache.tajo.worker.TaskRunnerManager;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -230,7 +240,7 @@ public class TestKillQuery {
     QueryId qid = LocalTajoTestingUtility.newQueryId();
     ExecutionBlockId eid = QueryIdFactory.newExecutionBlockId(qid, 1);
     TaskId tid = QueryIdFactory.newTaskId(eid);
-    TajoConf conf = new TajoConf();
+    final TajoConf conf = new TajoConf();
     TaskRequestImpl taskRequest = new TaskRequestImpl();
 
     taskRequest.set(null, new ArrayList<CatalogProtos.FragmentProto>(),
@@ -238,18 +248,37 @@ public class TestKillQuery {
     taskRequest.setInterQuery();
     TaskAttemptId attemptId = new TaskAttemptId(tid, 1);
 
+    WorkerConnectionInfo queryMaster = new WorkerConnectionInfo("host", 28091, 
28092, 21000, 28093, 28080);
+    TajoWorkerProtocol.RunExecutionBlockRequestProto.Builder
+        requestProto = 
TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder();
+
+    requestProto.setExecutionBlockId(eid.getProto())
+        .setQueryMaster(queryMaster.getProto())
+        .setNodeId(queryMaster.getHost()+":" + 
queryMaster.getQueryMasterPort())
+        .setContainerId("test")
+        .setQueryContext(new QueryContext(conf).getProto())
+        .setPlanJson("test")
+        .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+
+    TajoWorker.WorkerContext workerContext = new MockWorkerContext() {
+      @Override
+      public TajoConf getConf() {
+        return conf;
+      }
+    };
+
     ExecutionBlockContext context =
-        new ExecutionBlockContext(conf, null, null, new QueryContext(conf), 
null, eid, null, null);
+        new ExecutionBlockContext(workerContext, null, requestProto.build());
 
-    org.apache.tajo.worker.Task task = new Task("test", 
CommonTestingUtil.getTestDir(), attemptId,
+    org.apache.tajo.worker.Task task = new LegacyTaskImpl("test", 
CommonTestingUtil.getTestDir(), attemptId,
         conf, context, taskRequest);
     task.kill();
-    assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus());
+    assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, 
task.getTaskContext().getState());
     try {
       task.run();
-      assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus());
+      assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, 
task.getTaskContext().getState());
     } catch (Exception e) {
-      assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus());
+      assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, 
task.getTaskContext().getState());
     }
   }
 
@@ -271,4 +300,94 @@ public class TestKillQuery {
       super.dispatch(event);
     }
   }
+
+  abstract class MockWorkerContext implements TajoWorker.WorkerContext {
+
+    @Override
+    public QueryMaster getQueryMaster() {
+      return null;
+    }
+
+    public abstract TajoConf getConf();
+
+    @Override
+    public ServiceTracker getServiceTracker() {
+      return null;
+    }
+
+    @Override
+    public QueryMasterManagerService getQueryMasterManagerService() {
+      return null;
+    }
+
+    @Override
+    public TaskRunnerManager getTaskRunnerManager() {
+      return null;
+    }
+
+    @Override
+    public CatalogService getCatalog() {
+      return null;
+    }
+
+    @Override
+    public WorkerConnectionInfo getConnectionInfo() {
+      return null;
+    }
+
+    @Override
+    public String getWorkerName() {
+      return null;
+    }
+
+    @Override
+    public LocalDirAllocator getLocalDirAllocator() {
+      return null;
+    }
+
+    @Override
+    public QueryCoordinatorProtocol.ClusterResourceSummary 
getClusterResource() {
+      return null;
+    }
+
+    @Override
+    public TajoSystemMetrics getWorkerSystemMetrics() {
+      return null;
+    }
+
+    @Override
+    public HashShuffleAppenderManager getHashShuffleAppenderManager() {
+      return null;
+    }
+
+    @Override
+    public HistoryWriter getTaskHistoryWriter() {
+      return null;
+    }
+
+    @Override
+    public HistoryReader getHistoryReader() {
+      return null;
+    }
+
+    @Override
+    public void cleanup(String strPath) {
+
+    }
+
+    @Override
+    public void cleanupTemporalDirectories() {
+
+    }
+
+    @Override
+    public void 
setClusterResource(QueryCoordinatorProtocol.ClusterResourceSummary 
clusterResource) {
+
+    }
+
+    @Override
+    public void setNumClusterNodes(int numClusterNodes) {
+
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java 
b/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
new file mode 100644
index 0000000..9d4e1f3
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+
+import java.io.IOException;
+
+public class MockExecutionBlock extends ExecutionBlockContext {
+
+  public MockExecutionBlock(TajoWorker.WorkerContext workerContext,
+                            TajoWorkerProtocol.RunExecutionBlockRequestProto 
request) throws IOException {
+    super(workerContext, null, request);
+  }
+
+  @Override
+  public void init() throws Throwable {
+    //skip
+  }
+
+  @Override
+  public void fatalError(TaskAttemptId taskAttemptId, String message) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java 
b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
new file mode 100644
index 0000000..18b9405
--- /dev/null
+++ 
b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.worker.event.NodeResourceEvent;
+
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+
+public class MockNodeResourceManager extends NodeResourceManager {
+
+  volatile boolean enableTaskHandlerEvent = true;
+  private final Semaphore barrier;
+
+  public MockNodeResourceManager(Semaphore barrier, Dispatcher dispatcher, 
EventHandler taskEventHandler) {
+    super(dispatcher, taskEventHandler);
+    this.barrier = barrier;
+  }
+
+  @Override
+  public void handle(NodeResourceEvent event) {
+    super.handle(event);
+    barrier.release();
+  }
+
+  @Override
+  protected void 
startExecutionBlock(TajoWorkerProtocol.RunExecutionBlockRequestProto request) {
+    if(enableTaskHandlerEvent) {
+      super.startExecutionBlock(request);
+    }
+  }
+
+  @Override
+  protected void startTask(TajoWorkerProtocol.TaskRequestProto request, 
NodeResource resource) {
+    if(enableTaskHandlerEvent) {
+      super.startTask(request, resource);
+    }
+  }
+
+  /**
+   * skip task execution and deallocation for testing
+   * */
+  public void setTaskHandlerEvent(boolean flag) {
+    enableTaskHandlerEvent = flag;
+  }
+
+  protected static Queue<TajoWorkerProtocol.TaskAllocationRequestProto> 
createTaskRequests(
+      ExecutionBlockId ebId, int memory, int size) {
+
+    Queue<TajoWorkerProtocol.TaskAllocationRequestProto>
+        requestProtoList = new 
LinkedBlockingQueue<TajoWorkerProtocol.TaskAllocationRequestProto>();
+    for (int i = 0; i < size; i++) {
+
+      TaskAttemptId taskAttemptId = 
QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(ebId, i), 0);
+      TajoWorkerProtocol.TaskRequestProto.Builder builder =
+          TajoWorkerProtocol.TaskRequestProto.newBuilder();
+      builder.setId(taskAttemptId.getProto());
+      builder.setShouldDie(true);
+      builder.setOutputTable("");
+      builder.setPlan(PlanProto.LogicalNodeTree.newBuilder());
+      builder.setClusteredOutput(false);
+
+
+      
requestProtoList.add(TajoWorkerProtocol.TaskAllocationRequestProto.newBuilder()
+          .setResource(NodeResources.createResource(memory).getProto())
+          .setTaskRequest(builder.build()).build());
+    }
+    return requestProtoList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java 
b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
index 2d7d0be..dfcfd4f 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
@@ -39,9 +39,9 @@ public class MockNodeStatusUpdater extends NodeStatusUpdater {
   private Map<Integer, NodeResource> resources = Maps.newHashMap();
   private MockResourceTracker resourceTracker;
 
-  public MockNodeStatusUpdater(CountDownLatch barrier, WorkerConnectionInfo 
connectionInfo,
+  public MockNodeStatusUpdater(CountDownLatch barrier, 
TajoWorker.WorkerContext workerContext,
                                NodeResourceManager resourceManager) {
-    super(connectionInfo, resourceManager);
+    super(workerContext, resourceManager);
     this.barrier = barrier;
     this.resourceTracker = new MockResourceTracker();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java 
b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
new file mode 100644
index 0000000..f62733f
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.worker.event.TaskExecutorEvent;
+
+import java.io.IOException;
+import java.util.concurrent.Semaphore;
+
+public class MockTaskExecutor extends TaskExecutor {
+
+  protected final Semaphore barrier;
+
+  public MockTaskExecutor(Semaphore barrier, TaskManager taskManager, 
EventHandler rmEventHandler) {
+    super(taskManager, rmEventHandler);
+    this.barrier = barrier;
+  }
+
+  @Override
+  public void handle(TaskExecutorEvent event) {
+    super.handle(event);
+    barrier.release();
+  }
+
+  @Override
+  protected Task createTask(final ExecutionBlockContext context, 
TajoWorkerProtocol.TaskRequestProto taskRequest) {
+    final TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequest.getId());
+
+    //ignore status changed log
+    final TaskAttemptContext taskAttemptContext = new TaskAttemptContext(null, 
context, taskAttemptId, null, null) {
+      private TajoProtos.TaskAttemptState state;
+
+      @Override
+      public TajoProtos.TaskAttemptState getState() {
+        return state;
+      }
+
+      @Override
+      public void setState(TajoProtos.TaskAttemptState state) {
+        this.state = state;
+      }
+    };
+
+    return new Task() {
+      @Override
+      public void init() throws IOException {
+
+      }
+
+      @Override
+      public void fetch() {
+
+      }
+
+      @Override
+      public void run() throws Exception {
+        taskAttemptContext.stop();
+        taskAttemptContext.setProgress(1.0f);
+        taskAttemptContext.setState(TajoProtos.TaskAttemptState.TA_SUCCEEDED);
+      }
+
+      @Override
+      public void kill() {
+
+      }
+
+      @Override
+      public void abort() {
+
+      }
+
+      @Override
+      public void cleanup() {
+
+      }
+
+      @Override
+      public boolean hasFetchPhase() {
+        return false;
+      }
+
+      @Override
+      public boolean isProgressChanged() {
+        return false;
+      }
+
+      @Override
+      public boolean isStopped() {
+        return taskAttemptContext.isStopped();
+      }
+
+      @Override
+      public void updateProgress() {
+
+      }
+
+      @Override
+      public TaskAttemptContext getTaskContext() {
+        return taskAttemptContext;
+      }
+
+      @Override
+      public ExecutionBlockContext getExecutionBlockContext() {
+        return context;
+      }
+
+      @Override
+      public TajoWorkerProtocol.TaskStatusProto getReport() {
+        TajoWorkerProtocol.TaskStatusProto.Builder builder = 
TajoWorkerProtocol.TaskStatusProto.newBuilder();
+      builder.setWorkerName("localhost:0");
+      builder.setId(taskAttemptContext.getTaskId().getProto())
+          .setProgress(taskAttemptContext.getProgress())
+          .setState(taskAttemptContext.getState());
+
+      builder.setInputStats(new TableStats().getProto());
+      return builder.build();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskManager.java 
b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskManager.java
new file mode 100644
index 0000000..678b063
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskManager.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.worker.event.TaskManagerEvent;
+
+import java.io.IOException;
+import java.util.concurrent.Semaphore;
+
+public class MockTaskManager extends TaskManager {
+
+  private final Semaphore barrier;
+
+  public MockTaskManager(Semaphore barrier, Dispatcher dispatcher, 
TajoWorker.WorkerContext workerContext, EventHandler rmEventHandler) {
+    super(dispatcher, workerContext, rmEventHandler);
+    this.barrier = barrier;
+  }
+
+  @Override
+  protected ExecutionBlockContext 
createExecutionBlock(TajoWorkerProtocol.RunExecutionBlockRequestProto request) {
+    try {
+      return new MockExecutionBlock(getWorkerContext(), request);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  protected void stopExecutionBlock(ExecutionBlockContext context,
+                                    TajoWorkerProtocol.ExecutionBlockListProto 
cleanupList) {
+    //skip for testing
+  }
+
+  @Override
+  public void handle(TaskManagerEvent event) {
+    super.handle(event);
+    barrier.release();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/MockWorkerContext.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/worker/MockWorkerContext.java 
b/tajo-core/src/test/java/org/apache/tajo/worker/MockWorkerContext.java
new file mode 100644
index 0000000..e8c2b9c
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockWorkerContext.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.querymaster.QueryMaster;
+import org.apache.tajo.querymaster.QueryMasterManagerService;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.util.history.HistoryReader;
+import org.apache.tajo.util.history.HistoryWriter;
+import org.apache.tajo.util.metrics.TajoSystemMetrics;
+
+public abstract class MockWorkerContext implements TajoWorker.WorkerContext {
+  TajoSystemMetrics tajoSystemMetrics;
+
+  @Override
+  public QueryMaster getQueryMaster() {
+    return null;
+  }
+
+  public abstract TajoConf getConf();
+
+  @Override
+  public ServiceTracker getServiceTracker() {
+    return null;
+  }
+
+  @Override
+  public QueryMasterManagerService getQueryMasterManagerService() {
+    return null;
+  }
+
+  @Override
+  public TaskRunnerManager getTaskRunnerManager() {
+    return null;
+  }
+
+  @Override
+  public CatalogService getCatalog() {
+    return null;
+  }
+
+  @Override
+  public WorkerConnectionInfo getConnectionInfo() {
+    return null;
+  }
+
+  @Override
+  public String getWorkerName() {
+    return null;
+  }
+
+  @Override
+  public LocalDirAllocator getLocalDirAllocator() {
+    return null;
+  }
+
+  @Override
+  public QueryCoordinatorProtocol.ClusterResourceSummary getClusterResource() {
+    return null;
+  }
+
+  @Override
+  public TajoSystemMetrics getWorkerSystemMetrics() {
+
+    if (tajoSystemMetrics == null) {
+      tajoSystemMetrics = new TajoSystemMetrics(getConf(), "test-file-group", 
"localhost");
+      tajoSystemMetrics.start();
+    }
+    return tajoSystemMetrics;
+  }
+
+  @Override
+  public HashShuffleAppenderManager getHashShuffleAppenderManager() {
+    return null;
+  }
+
+  @Override
+  public HistoryWriter getTaskHistoryWriter() {
+    return null;
+  }
+
+  @Override
+  public HistoryReader getHistoryReader() {
+    return null;
+  }
+
+  @Override
+  public void cleanup(String strPath) {
+
+  }
+
+  @Override
+  public void cleanupTemporalDirectories() {
+
+  }
+
+  @Override
+  public void 
setClusterResource(QueryCoordinatorProtocol.ClusterResourceSummary 
clusterResource) {
+
+  }
+
+  @Override
+  public void setNumClusterNodes(int numClusterNodes) {
+
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java 
b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
index 513eb69..65627c1 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -104,13 +104,13 @@ public class TestFetcher {
 
   @Test
   public void testAdjustFetchProcess() {
-    assertEquals(0.0f, Task.adjustFetchProcess(0, 0), 0);
-    assertEquals(0.0f, Task.adjustFetchProcess(10, 10), 0);
-    assertEquals(0.05f, Task.adjustFetchProcess(10, 9), 0);
-    assertEquals(0.1f, Task.adjustFetchProcess(10, 8), 0);
-    assertEquals(0.25f, Task.adjustFetchProcess(10, 5), 0);
-    assertEquals(0.45f, Task.adjustFetchProcess(10, 1), 0);
-    assertEquals(0.5f, Task.adjustFetchProcess(10, 0), 0);
+    assertEquals(0.0f, LegacyTaskImpl.adjustFetchProcess(0, 0), 0);
+    assertEquals(0.0f, LegacyTaskImpl.adjustFetchProcess(10, 10), 0);
+    assertEquals(0.05f, LegacyTaskImpl.adjustFetchProcess(10, 9), 0);
+    assertEquals(0.1f, LegacyTaskImpl.adjustFetchProcess(10, 8), 0);
+    assertEquals(0.25f, LegacyTaskImpl.adjustFetchProcess(10, 5), 0);
+    assertEquals(0.45f, LegacyTaskImpl.adjustFetchProcess(10, 1), 0);
+    assertEquals(0.5f, LegacyTaskImpl.adjustFetchProcess(10, 0), 0);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java 
b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
index 7407acc..2cee7d0 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java
@@ -19,13 +19,15 @@
 package org.apache.tajo.worker;
 
 import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.tajo.*;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.plan.serder.PlanProto;
-import org.apache.tajo.resource.NodeResources;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
@@ -42,9 +44,15 @@ import static org.junit.Assert.*;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
 public class TestNodeResourceManager {
 
-  private NodeResourceManager resourceManager;
-  private MockNodeStatusUpdater statusUpdater;
+  private MockNodeResourceManager resourceManager;
+  private NodeStatusUpdater statusUpdater;
+  private TaskManager taskManager;
+  private TaskExecutor taskExecutor;
   private AsyncDispatcher dispatcher;
+  private AsyncDispatcher taskDispatcher;
+  private TajoWorker.WorkerContext workerContext;
+
+  private CompositeService service;
   private int taskMemory;
   private TajoConf conf;
 
@@ -61,29 +69,55 @@ public class TestNodeResourceManager {
     
conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM, 
1);
 
     dispatcher = new AsyncDispatcher();
-    dispatcher.init(conf);
-    dispatcher.start();
-
-    resourceManager = new NodeResourceManager(dispatcher);
-    resourceManager.init(conf);
-    resourceManager.start();
-
-    WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 
28092, 21000, 28093, 28080);
-    statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), worker, 
resourceManager);
-    statusUpdater.init(conf);
-    statusUpdater.start();
+    taskDispatcher = new AsyncDispatcher();
+
+    workerContext = new MockWorkerContext() {
+      WorkerConnectionInfo workerConnectionInfo;
+      @Override
+      public TajoConf getConf() {
+        return conf;
+      }
+
+      @Override
+      public WorkerConnectionInfo getConnectionInfo() {
+        if (workerConnectionInfo == null) {
+          workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 
28092, 21000, 28093, 28080);
+        }
+        return workerConnectionInfo;
+      }
+    };
+
+    taskManager = new MockTaskManager(new Semaphore(0), taskDispatcher, 
workerContext, dispatcher.getEventHandler());
+    taskExecutor = new MockTaskExecutor(new Semaphore(0), taskManager, 
dispatcher.getEventHandler());
+    resourceManager = new MockNodeResourceManager(new Semaphore(0), 
dispatcher, taskDispatcher.getEventHandler());
+    statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), 
workerContext, resourceManager);
+
+    service = new CompositeService("MockService") {
+      @Override
+      protected void serviceInit(Configuration conf) throws Exception {
+        addIfService(dispatcher);
+        addIfService(taskDispatcher);
+        addIfService(taskManager);
+        addIfService(taskExecutor);
+        addIfService(resourceManager);
+        addIfService(statusUpdater);
+        super.serviceInit(conf);
+      }
+    };
+
+    service.init(conf);
+    service.start();
   }
 
   @After
   public void tearDown() {
-    resourceManager.stop();
-    statusUpdater.stop();
-    dispatcher.stop();
+    service.stop();
   }
 
   @Test
   public void testNodeResourceAllocateEvent() throws Exception {
     int requestSize = 4;
+    resourceManager.setTaskHandlerEvent(false); //skip task execution
 
     CallFuture<BatchAllocationResponseProto> callFuture  = new 
CallFuture<BatchAllocationResponseProto>();
     BatchAllocationRequestProto.Builder requestProto = 
BatchAllocationRequestProto.newBuilder();
@@ -91,14 +125,14 @@ public class TestNodeResourceManager {
     requestProto.setExecutionBlockId(ebId.getProto());
 
     assertEquals(resourceManager.getTotalResource(), 
resourceManager.getAvailableResource());
-    requestProto.addAllTaskRequest(createTaskRequests(taskMemory, 
requestSize));
+    
requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, 
taskMemory, requestSize));
 
     dispatcher.getEventHandler().handle(new 
NodeResourceAllocateEvent(requestProto.build(), callFuture));
 
     BatchAllocationResponseProto responseProto = callFuture.get();
     assertNotEquals(resourceManager.getTotalResource(), 
resourceManager.getAvailableResource());
+    // allocated all
     assertEquals(0, responseProto.getCancellationTaskCount());
-    assertEquals(requestSize, resourceManager.getAllocatedSize());
   }
 
 
@@ -106,6 +140,7 @@ public class TestNodeResourceManager {
   public void testNodeResourceCancellation() throws Exception {
     int requestSize = 
conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
     int overSize = 10;
+    resourceManager.setTaskHandlerEvent(false); //skip task execution
 
     CallFuture<BatchAllocationResponseProto> callFuture = new 
CallFuture<BatchAllocationResponseProto>();
     BatchAllocationRequestProto.Builder requestProto = 
BatchAllocationRequestProto.newBuilder();
@@ -113,18 +148,19 @@ public class TestNodeResourceManager {
     requestProto.setExecutionBlockId(ebId.getProto());
 
     assertEquals(resourceManager.getTotalResource(), 
resourceManager.getAvailableResource());
-    requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize 
+ overSize));
+    requestProto.addAllTaskRequest(
+        MockNodeResourceManager.createTaskRequests(ebId, taskMemory, 
requestSize + overSize));
 
     dispatcher.getEventHandler().handle(new 
NodeResourceAllocateEvent(requestProto.build(), callFuture));
     BatchAllocationResponseProto responseProto = callFuture.get();
 
     assertEquals(overSize, responseProto.getCancellationTaskCount());
-    assertEquals(requestSize, resourceManager.getAllocatedSize());
   }
 
   @Test
   public void testNodeResourceDeallocateEvent() throws Exception {
     int requestSize = 4;
+    resourceManager.setTaskHandlerEvent(false); //skip task execution
 
     CallFuture<BatchAllocationResponseProto> callFuture  = new 
CallFuture<BatchAllocationResponseProto>();
     BatchAllocationRequestProto.Builder requestProto = 
BatchAllocationRequestProto.newBuilder();
@@ -132,21 +168,20 @@ public class TestNodeResourceManager {
     requestProto.setExecutionBlockId(ebId.getProto());
 
     assertEquals(resourceManager.getTotalResource(), 
resourceManager.getAvailableResource());
-    requestProto.addAllTaskRequest(createTaskRequests(taskMemory, 
requestSize));
+    
requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, 
taskMemory, requestSize));
 
     dispatcher.getEventHandler().handle(new 
NodeResourceAllocateEvent(requestProto.build(), callFuture));
 
     BatchAllocationResponseProto responseProto = callFuture.get();
     assertNotEquals(resourceManager.getTotalResource(), 
resourceManager.getAvailableResource());
     assertEquals(0, responseProto.getCancellationTaskCount());
-    assertEquals(requestSize, resourceManager.getAllocatedSize());
 
     //deallocate
     for(TaskAllocationRequestProto allocationRequestProto : 
requestProto.getTaskRequestList()) {
       // direct invoke handler for testing
       resourceManager.handle(new 
NodeResourceDeallocateEvent(allocationRequestProto.getResource()));
     }
-    assertEquals(0, resourceManager.getAllocatedSize());
+
     assertEquals(resourceManager.getTotalResource(), 
resourceManager.getAvailableResource());
   }
 
@@ -154,12 +189,38 @@ public class TestNodeResourceManager {
   public void testParallelRequest() throws Exception {
     final int parallelCount = 
conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES) * 2;
     final int taskSize = 100000;
+    resourceManager.setTaskHandlerEvent(true);
+
     final AtomicInteger totalComplete = new AtomicInteger();
     final AtomicInteger totalCanceled = new AtomicInteger();
 
     final ExecutionBlockId ebId = new 
ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
-    final Queue<TaskAllocationRequestProto> totalTasks = 
createTaskRequests(taskMemory, taskSize);
+    final Queue<TaskAllocationRequestProto>
+        totalTasks = MockNodeResourceManager.createTaskRequests(ebId, 
taskMemory, taskSize);
+
+    // first request with starting ExecutionBlock
+    TajoWorkerProtocol.RunExecutionBlockRequestProto.Builder
+        ebRequestProto = 
TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder();
+    ebRequestProto.setExecutionBlockId(ebId.getProto())
+        .setQueryMaster(workerContext.getConnectionInfo().getProto())
+        .setNodeId(workerContext.getConnectionInfo().getHost() + ":" +
+            workerContext.getConnectionInfo().getQueryMasterPort())
+        .setContainerId("test")
+        .setQueryContext(new QueryContext(conf).getProto())
+        .setPlanJson("test")
+        .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+
+    TaskAllocationRequestProto task = totalTasks.poll();
+    BatchAllocationRequestProto.Builder requestProto = 
BatchAllocationRequestProto.newBuilder();
+    requestProto.addTaskRequest(task);
+    requestProto.setExecutionBlockId(ebId.getProto());
+    requestProto.setExecutionBlockRequest(ebRequestProto.build());
+    CallFuture<BatchAllocationResponseProto> callFuture = new 
CallFuture<BatchAllocationResponseProto>();
+    dispatcher.getEventHandler().handle(new 
NodeResourceAllocateEvent(requestProto.build(), callFuture));
+    assertTrue(callFuture.get().getCancellationTaskCount() == 0);
+    totalComplete.incrementAndGet();
 
+    // start parallel request
     ExecutorService executor = Executors.newFixedThreadPool(parallelCount);
     List<Future> futureList = Lists.newArrayList();
 
@@ -187,7 +248,6 @@ public class TestNodeResourceManager {
                     totalCanceled.addAndGet(proto.getCancellationTaskCount());
                   } else {
                     complete++;
-                    dispatcher.getEventHandler().handle(new 
NodeResourceDeallocateEvent(task.getResource()));
                   }
                 } catch (Exception e) {
                   fail(e.getMessage());
@@ -209,27 +269,4 @@ public class TestNodeResourceManager {
     executor.shutdown();
     assertEquals(taskSize, totalComplete.get());
   }
-
-  protected static Queue<TaskAllocationRequestProto> createTaskRequests(int 
memory, int size) {
-    Queue<TaskAllocationRequestProto> requestProtoList = new 
LinkedBlockingQueue<TaskAllocationRequestProto>();
-    for (int i = 0; i < size; i++) {
-
-      ExecutionBlockId nullStage = 
QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
-      TaskAttemptId taskAttemptId = 
QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, i), 0);
-
-      TajoWorkerProtocol.TaskRequestProto.Builder builder =
-          TajoWorkerProtocol.TaskRequestProto.newBuilder();
-      builder.setId(taskAttemptId.getProto());
-      builder.setShouldDie(true);
-      builder.setOutputTable("");
-      builder.setPlan(PlanProto.LogicalNodeTree.newBuilder());
-      builder.setClusteredOutput(false);
-
-
-      requestProtoList.add(TaskAllocationRequestProto.newBuilder()
-          .setResource(NodeResources.createResource(memory).getProto())
-          .setTaskRequest(builder.build()).build());
-    }
-    return requestProtoList;
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java 
b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
index fb3c77e..af40554 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.rm.Worker;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.worker.event.NodeStatusEvent;
 import org.junit.After;
@@ -37,18 +38,36 @@ public class TestNodeStatusUpdater {
   private MockNodeStatusUpdater statusUpdater;
   private AsyncDispatcher dispatcher;
   private TajoConf conf;
+  private TajoWorker.WorkerContext workerContext;
+
 
   @Before
   public void setup() {
     conf = new TajoConf();
     conf.set(CommonTestingUtil.TAJO_TEST_KEY, 
CommonTestingUtil.TAJO_TEST_TRUE);
+    workerContext = new MockWorkerContext() {
+      WorkerConnectionInfo workerConnectionInfo;
+
+      @Override
+      public TajoConf getConf() {
+        return conf;
+      }
+
+      @Override
+      public WorkerConnectionInfo getConnectionInfo() {
+        if (workerConnectionInfo == null) {
+          workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 
28092, 21000, 28093, 28080);
+        }
+        return workerConnectionInfo;
+      }
+    };
 
     conf.setIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_INTERVAL, 1000);
     dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
 
-    resourceManager = new NodeResourceManager(dispatcher);
+    resourceManager = new NodeResourceManager(dispatcher, null);
     resourceManager.init(conf);
     resourceManager.start();
   }
@@ -63,27 +82,25 @@ public class TestNodeStatusUpdater {
   @Test(timeout = 20000)
   public void testNodeMembership() throws Exception {
     CountDownLatch barrier = new CountDownLatch(1);
-    WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 
28092, 21000, 28093, 28080);
-    statusUpdater = new MockNodeStatusUpdater(barrier, worker, 
resourceManager);
+    statusUpdater = new MockNodeStatusUpdater(barrier, workerContext, 
resourceManager);
     statusUpdater.init(conf);
     statusUpdater.start();
 
     MockNodeStatusUpdater.MockResourceTracker resourceTracker = 
statusUpdater.getResourceTracker();
     barrier.await();
 
-    assertTrue(resourceTracker.getTotalResource().containsKey(worker.getId()));
+    
assertTrue(resourceTracker.getTotalResource().containsKey(workerContext.getConnectionInfo().getId()));
     assertEquals(resourceManager.getTotalResource(),
-        resourceTracker.getTotalResource().get(worker.getId()));
+        
resourceTracker.getTotalResource().get(workerContext.getConnectionInfo().getId()));
 
     assertEquals(resourceManager.getAvailableResource(),
-        resourceTracker.getAvailableResource().get(worker.getId()));
+        
resourceTracker.getAvailableResource().get(workerContext.getConnectionInfo().getId()));
   }
 
   @Test(timeout = 20000)
   public void testPing() throws Exception {
     CountDownLatch barrier = new CountDownLatch(2);
-    WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 
28092, 21000, 28093, 28080);
-    statusUpdater = new MockNodeStatusUpdater(barrier, worker, 
resourceManager);
+    statusUpdater = new MockNodeStatusUpdater(barrier, workerContext, 
resourceManager);
     statusUpdater.init(conf);
     statusUpdater.start();
 
@@ -100,16 +117,29 @@ public class TestNodeStatusUpdater {
   @Test(timeout = 20000)
   public void testResourceReport() throws Exception {
     CountDownLatch barrier = new CountDownLatch(2);
-    WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 
28092, 21000, 28093, 28080);
-    statusUpdater = new MockNodeStatusUpdater(barrier, worker, 
resourceManager);
+    statusUpdater = new MockNodeStatusUpdater(barrier, workerContext, 
resourceManager);
     statusUpdater.init(conf);
     statusUpdater.start();
 
+    assertEquals(0, statusUpdater.getQueueSize());
     for (int i = 0; i < statusUpdater.getQueueingLimit(); i++) {
-      dispatcher.getEventHandler().handle(new 
NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE,
-          resourceManager.getAvailableResource()));
+      dispatcher.getEventHandler().handle(new 
NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE));
     }
     barrier.await();
     assertEquals(0, statusUpdater.getQueueSize());
   }
+
+  @Test(timeout = 20000)
+  public void testFlushResourceReport() throws Exception {
+    CountDownLatch barrier = new CountDownLatch(2);
+    statusUpdater = new MockNodeStatusUpdater(barrier, workerContext, 
resourceManager);
+    statusUpdater.init(conf);
+    statusUpdater.start();
+
+    assertEquals(0, statusUpdater.getQueueSize());
+    dispatcher.getEventHandler().handle(new 
NodeStatusEvent(NodeStatusEvent.EventType.FLUSH_REPORTS));
+
+    barrier.await();
+    assertEquals(0, statusUpdater.getQueueSize());
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java 
b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
new file mode 100644
index 0000000..98b187b
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.*;
+import org.apache.tajo.annotation.ThreadSafe;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.querymaster.QueryMaster;
+import org.apache.tajo.querymaster.QueryMasterManagerService;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.history.HistoryReader;
+import org.apache.tajo.util.history.HistoryWriter;
+import org.apache.tajo.util.metrics.TajoSystemMetrics;
+import org.apache.tajo.worker.event.ExecutionBlockStartEvent;
+import org.apache.tajo.worker.event.ExecutionBlockStopEvent;
+import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import static org.junit.Assert.*;
+
+public class TestTaskExecutor {
+
+  private NodeResourceManager resourceManager;
+  private NodeStatusUpdater statusUpdater;
+  private TaskManager taskManager;
+  private TaskExecutor taskExecutor;
+  private AsyncDispatcher dispatcher;
+  private AsyncDispatcher taskDispatcher;
+  private TajoWorker.WorkerContext workerContext;
+
+  private CompositeService service;
+  private TajoConf conf;
+  private Semaphore barrier;
+  private Semaphore resourceManagerBarrier;
+
+  @Before
+  public void setup() {
+    conf = new TajoConf();
+    conf.set(CommonTestingUtil.TAJO_TEST_KEY, 
CommonTestingUtil.TAJO_TEST_TRUE);
+    dispatcher = new AsyncDispatcher();
+    taskDispatcher = new AsyncDispatcher();
+
+    workerContext = new MockWorkerContext() {
+      WorkerConnectionInfo workerConnectionInfo;
+
+      @Override
+      public TajoConf getConf() {
+        return conf;
+      }
+
+      @Override
+      public WorkerConnectionInfo getConnectionInfo() {
+        if (workerConnectionInfo == null) {
+          workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 
28092, 21000, 28093, 28080);
+        }
+        return workerConnectionInfo;
+      }
+    };
+
+    barrier = new Semaphore(0);
+    resourceManagerBarrier = new Semaphore(0);
+    taskManager = new MockTaskManager(new Semaphore(0), taskDispatcher, 
workerContext, dispatcher.getEventHandler());
+    taskExecutor = new TaskExecutor(barrier, taskManager, 
dispatcher.getEventHandler());
+    resourceManager = new MockNodeResourceManager(resourceManagerBarrier, 
dispatcher, taskDispatcher.getEventHandler());
+    statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), 
workerContext, resourceManager);
+
+    service = new CompositeService("MockService") {
+      @Override
+      protected void serviceInit(Configuration conf) throws Exception {
+        addIfService(dispatcher);
+        addIfService(taskDispatcher);
+        addIfService(taskManager);
+        addIfService(taskExecutor);
+        addIfService(resourceManager);
+        addIfService(statusUpdater);
+        super.serviceInit(conf);
+      }
+
+
+      @Override
+      protected void serviceStop() throws Exception {
+        workerContext.getWorkerSystemMetrics().stop();
+        super.serviceStop();
+      }
+    };
+
+    service.init(conf);
+    service.start();
+  }
+
+  @After
+  public void tearDown() {
+    service.stop();
+  }
+
+  @Test
+  public void testTaskRequest() throws Exception {
+    int requestSize = 1;
+
+    RunExecutionBlockRequestProto.Builder
+        ebRequestProto = RunExecutionBlockRequestProto.newBuilder();
+    QueryId qid = LocalTajoTestingUtility.newQueryId();
+    ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(qid, 1);
+
+    ebRequestProto.setExecutionBlockId(ebId.getProto())
+        .setQueryMaster(workerContext.getConnectionInfo().getProto())
+        .setNodeId(workerContext.getConnectionInfo().getHost() + ":"
+            + workerContext.getConnectionInfo().getQueryMasterPort())
+        .setContainerId("test")
+        .setQueryContext(new QueryContext(conf).getProto())
+        .setPlanJson("test")
+        .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+
+    CallFuture<BatchAllocationResponseProto> callFuture  = new 
CallFuture<BatchAllocationResponseProto>();
+    BatchAllocationRequestProto.Builder requestProto = 
BatchAllocationRequestProto.newBuilder();
+    requestProto.setExecutionBlockId(ebId.getProto());
+    requestProto.setExecutionBlockRequest(ebRequestProto.build());
+
+    assertEquals(resourceManager.getTotalResource(), 
resourceManager.getAvailableResource());
+    
requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, 
10, requestSize));
+
+    dispatcher.getEventHandler().handle(new 
NodeResourceAllocateEvent(requestProto.build(), callFuture));
+
+    //verify running task
+    assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
+    assertEquals(1, taskExecutor.getRunningTasks());
+    assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
+    assertEquals(0, taskExecutor.getRunningTasks());
+    assertEquals(1, taskExecutor.completeTasks);
+
+    //verify the released resources
+    Thread.sleep(100);
+    assertEquals(resourceManager.getTotalResource(), 
resourceManager.getAvailableResource());
+  }
+
+  @Test
+  public void testTaskException() throws Exception {
+    int requestSize = 1;
+
+    RunExecutionBlockRequestProto.Builder
+        ebRequestProto = RunExecutionBlockRequestProto.newBuilder();
+    QueryId qid = LocalTajoTestingUtility.newQueryId();
+    ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(qid, 1);
+
+    ebRequestProto.setExecutionBlockId(ebId.getProto())
+        .setQueryMaster(workerContext.getConnectionInfo().getProto())
+        .setNodeId(workerContext.getConnectionInfo().getHost()+":"
+            + workerContext.getConnectionInfo().getQueryMasterPort())
+        .setContainerId("test")
+        .setQueryContext(new QueryContext(conf).getProto())
+        .setPlanJson("test")
+        .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+
+    CallFuture<BatchAllocationResponseProto> callFuture  = new 
CallFuture<BatchAllocationResponseProto>();
+    BatchAllocationRequestProto.Builder requestProto = 
BatchAllocationRequestProto.newBuilder();
+    requestProto.setExecutionBlockId(ebId.getProto());
+    requestProto.setExecutionBlockRequest(ebRequestProto.build());
+
+    assertEquals(resourceManager.getTotalResource(), 
resourceManager.getAvailableResource());
+    
requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, 
10, requestSize));
+
+    taskExecutor.throwException.set(true);
+    dispatcher.getEventHandler().handle(new 
NodeResourceAllocateEvent(requestProto.build(), callFuture));
+
+    //verify running task
+    assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
+    assertEquals(1, taskExecutor.getRunningTasks());
+    assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
+    assertEquals(0, taskExecutor.getRunningTasks());
+    assertEquals(0, taskExecutor.completeTasks);
+
+    //verify the released resources
+    Thread.sleep(100);
+    assertEquals(resourceManager.getTotalResource(), 
resourceManager.getAvailableResource());
+  }
+
+  class TaskExecutor extends MockTaskExecutor {
+    int completeTasks;
+    AtomicBoolean throwException = new AtomicBoolean();
+
+    public TaskExecutor(Semaphore barrier, TaskManager taskManager, 
EventHandler rmEventHandler) {
+      super(barrier, taskManager, rmEventHandler);
+    }
+
+    @Override
+    protected void stopTask(TaskAttemptId taskId) {
+      super.stopTask(taskId);
+      super.barrier.release();
+    }
+
+    @Override
+    protected Task createTask(final ExecutionBlockContext context, 
TajoWorkerProtocol.TaskRequestProto taskRequest) {
+      final TaskAttemptId taskAttemptId = new 
TaskAttemptId(taskRequest.getId());
+      final TaskAttemptContext taskAttemptContext = new 
TaskAttemptContext(null, context, taskAttemptId, null, null);
+
+      return new Task() {
+        @Override
+        public void init() throws IOException {
+
+          try {
+            Thread.sleep(50);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+
+        @Override
+        public void fetch() {
+          try {
+            Thread.sleep(50);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+
+        @Override
+        public void run() throws Exception {
+          Thread.sleep(50);
+
+          if(throwException.get()) throw new RuntimeException();
+
+          taskAttemptContext.stop();
+          taskAttemptContext.setProgress(1.0f);
+          
taskAttemptContext.setState(TajoProtos.TaskAttemptState.TA_SUCCEEDED);
+          completeTasks++;
+        }
+
+        @Override
+        public void kill() {
+
+        }
+
+        @Override
+        public void abort() {
+
+        }
+
+        @Override
+        public void cleanup() {
+        }
+
+        @Override
+        public boolean hasFetchPhase() {
+          return false;
+        }
+
+        @Override
+        public boolean isProgressChanged() {
+          return false;
+        }
+
+        @Override
+        public boolean isStopped() {
+          return taskAttemptContext.isStopped();
+        }
+
+        @Override
+        public void updateProgress() {
+
+        }
+
+        @Override
+        public TaskAttemptContext getTaskContext() {
+          return taskAttemptContext;
+        }
+
+        @Override
+        public ExecutionBlockContext getExecutionBlockContext() {
+          return context;
+        }
+
+        @Override
+        public TajoWorkerProtocol.TaskStatusProto getReport() {
+          TajoWorkerProtocol.TaskStatusProto.Builder builder = 
TajoWorkerProtocol.TaskStatusProto.newBuilder();
+          builder.setWorkerName("localhost:0");
+          builder.setId(taskAttemptContext.getTaskId().getProto())
+              .setProgress(taskAttemptContext.getProgress())
+              .setState(taskAttemptContext.getState());
+
+          builder.setInputStats(new TableStats().getProto());
+          return builder.build();
+        }
+      };
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskManager.java 
b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskManager.java
new file mode 100644
index 0000000..8bca489
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskManager.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.tajo.*;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.event.ExecutionBlockStartEvent;
+import org.apache.tajo.worker.event.ExecutionBlockStopEvent;
+import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.*;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import static org.junit.Assert.*;
+
+public class TestTaskManager {
+
+  private NodeResourceManager resourceManager;
+  private NodeStatusUpdater statusUpdater;
+  private TaskManager taskManager;
+  private TaskExecutor taskExecutor;
+  private AsyncDispatcher dispatcher;
+  private AsyncDispatcher taskDispatcher;
+  private TajoWorker.WorkerContext workerContext;
+
+  private CompositeService service;
+  private int taskMemory;
+  private TajoConf conf;
+  private Semaphore barrier;
+
+  @Before
+  public void setup() {
+    conf = new TajoConf();
+    conf.set(CommonTestingUtil.TAJO_TEST_KEY, 
CommonTestingUtil.TAJO_TEST_TRUE);
+
+    taskMemory = 512;
+    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 4);
+    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB,
+        taskMemory * 
conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES));
+    conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS_NUM, 4);
+    
conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM, 
1);
+
+    dispatcher = new AsyncDispatcher();
+    taskDispatcher = new AsyncDispatcher();
+
+    workerContext = new MockWorkerContext() {
+      WorkerConnectionInfo workerConnectionInfo;
+
+      @Override
+      public TajoConf getConf() {
+        return conf;
+      }
+
+      @Override
+      public WorkerConnectionInfo getConnectionInfo() {
+        if (workerConnectionInfo == null) {
+          workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 
28092, 21000, 28093, 28080);
+        }
+        return workerConnectionInfo;
+      }
+    };
+    barrier = new Semaphore(0);
+    taskManager = new MockTaskManager(barrier, taskDispatcher, workerContext, 
dispatcher.getEventHandler());
+    taskExecutor = new MockTaskExecutor(new Semaphore(0), taskManager, 
dispatcher.getEventHandler());
+    resourceManager = new NodeResourceManager(dispatcher, 
taskDispatcher.getEventHandler());
+    statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), 
workerContext, resourceManager);
+
+    service = new CompositeService("MockService") {
+      @Override
+      protected void serviceInit(Configuration conf) throws Exception {
+        addIfService(dispatcher);
+        addIfService(taskDispatcher);
+        addIfService(taskManager);
+        addIfService(taskExecutor);
+        addIfService(resourceManager);
+        addIfService(statusUpdater);
+        super.serviceInit(conf);
+      }
+
+
+      @Override
+      protected void serviceStop() throws Exception {
+        workerContext.getWorkerSystemMetrics().stop();
+        super.serviceStop();
+      }
+    };
+
+    service.init(conf);
+    service.start();
+  }
+
+  @After
+  public void tearDown() {
+    service.stop();
+  }
+
+  @Test(timeout = 10000)
+  public void testExecutionBlockStart() throws Exception {
+    int requestSize = 1;
+
+    TajoWorkerProtocol.RunExecutionBlockRequestProto.Builder
+        ebRequestProto = 
TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder();
+    QueryId qid = LocalTajoTestingUtility.newQueryId();
+    ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(qid, 1);
+
+    ebRequestProto.setExecutionBlockId(ebId.getProto())
+        .setQueryMaster(workerContext.getConnectionInfo().getProto())
+        .setNodeId(workerContext.getConnectionInfo().getHost() + ":"
+            + workerContext.getConnectionInfo().getQueryMasterPort())
+        .setContainerId("test")
+        .setQueryContext(new QueryContext(conf).getProto())
+        .setPlanJson("test")
+        .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+
+    CallFuture<BatchAllocationResponseProto> callFuture  = new 
CallFuture<BatchAllocationResponseProto>();
+    BatchAllocationRequestProto.Builder requestProto = 
BatchAllocationRequestProto.newBuilder();
+    requestProto.setExecutionBlockId(ebId.getProto());
+    requestProto.setExecutionBlockRequest(ebRequestProto.build());
+
+    assertEquals(resourceManager.getTotalResource(), 
resourceManager.getAvailableResource());
+    
requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, 
taskMemory, requestSize));
+
+    dispatcher.getEventHandler().handle(new 
NodeResourceAllocateEvent(requestProto.build(), callFuture));
+
+    assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
+    assertNotNull(taskManager.getExecutionBlockContext(ebId));
+    assertEquals(ebId, 
taskManager.getExecutionBlockContext(ebId).getExecutionBlockId());
+  }
+
+  @Test(timeout = 10000)
+  public void testExecutionBlockStop() throws Exception {
+
+    TajoWorkerProtocol.RunExecutionBlockRequestProto.Builder
+        ebRequestProto = 
TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder();
+    QueryId qid = LocalTajoTestingUtility.newQueryId();
+    ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(qid, 1);
+
+    ebRequestProto.setExecutionBlockId(ebId.getProto())
+        .setQueryMaster(workerContext.getConnectionInfo().getProto())
+        .setNodeId(workerContext.getConnectionInfo().getHost()+":"
+            + workerContext.getConnectionInfo().getQueryMasterPort())
+        .setContainerId("test")
+        .setQueryContext(new QueryContext(conf).getProto())
+        .setPlanJson("test")
+        .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
+
+    taskDispatcher.getEventHandler().handle(new 
ExecutionBlockStartEvent(ebRequestProto.build()));
+    assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
+    assertNotNull(taskManager.getExecutionBlockContext(ebId));
+    assertEquals(ebId, 
taskManager.getExecutionBlockContext(ebId).getExecutionBlockId());
+
+    ExecutionBlockListProto.Builder ebList = 
ExecutionBlockListProto.newBuilder();
+    taskDispatcher.getEventHandler().handle(new 
ExecutionBlockStopEvent(ebId.getProto(), ebList.build()));
+    assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
+    assertNull(taskManager.getExecutionBlockContext(ebId));
+  }
+}

Reply via email to