http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp 
b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
index 5c3ce7b..88b7c24 100644
--- a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
@@ -20,43 +20,30 @@
 <%@ page language="java" contentType="text/html; charset=UTF-8" 
pageEncoding="UTF-8"%>
 
 <%@ page import="org.apache.commons.lang.StringUtils" %>
+<%@ page import="org.apache.tajo.ResourceProtos.FetcherHistoryProto" %>
 <%@ page import="org.apache.tajo.TaskAttemptId" %>
-<%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %>
 <%@ page import="org.apache.tajo.util.JSPUtil" %>
 <%@ page import="org.apache.tajo.util.TajoIdUtils" %>
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="org.apache.tajo.worker.*" %>
+<%@ page import="org.apache.tajo.worker.Fetcher" %>
+<%@ page import="org.apache.tajo.worker.TajoWorker" %>
+<%@ page import="org.apache.tajo.worker.Task" %>
+<%@ page import="org.apache.tajo.worker.TaskHistory" %>
 <%@ page import="java.text.SimpleDateFormat" %>
 <%@ page import="java.util.List" %>
 
 <%
     TajoWorker tajoWorker = (TajoWorker) 
StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
 
-    String containerId = request.getParameter("containerId");
     String quAttemptId = request.getParameter("taskAttemptId");
     TaskAttemptId taskAttemptId = TajoIdUtils.parseTaskAttemptId(quAttemptId);
-    Task task = null;
-    TaskHistory taskHistory = null;
-    if(containerId == null || containerId.isEmpty() || 
"null".equals(containerId)) {
-        task = 
tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskByTaskAttemptId(taskAttemptId);
-        if (task != null) {
-            taskHistory = task.createTaskHistory();
-        } else {
-            taskHistory = 
tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskHistoryByTaskAttemptId(taskAttemptId);
-        }
+
+    TaskHistory taskHistory;
+    Task task = 
tajoWorker.getWorkerContext().getTaskManager().getTaskByTaskAttemptId(taskAttemptId);
+    if (task != null) {
+        taskHistory = task.createTaskHistory();
     } else {
-        TaskRunner runner = 
tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskRunner(containerId);
-        if(runner != null) {
-            task = runner.getContext().getTask(taskAttemptId);
-            if (task != null) {
-                taskHistory = task.createTaskHistory();
-            } else {
-                TaskRunnerHistory history = 
tajoWorker.getWorkerContext().getTaskRunnerManager().getExcutionBlockHistoryByTaskRunnerId(containerId);
-                if(history != null) {
-                    taskHistory = history.getTaskHistory(taskAttemptId);
-                }
-            }
-        }
+        taskHistory = 
tajoWorker.getWorkerContext().getTaskManager().getTaskHistory(taskAttemptId.getTaskId());
     }
     SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 %>
@@ -112,7 +99,7 @@
         int index = 1;
         int pageSize = 1000; //TODO pagination
 
-        List<TajoWorkerProtocol.FetcherHistoryProto> fetcherHistories = 
taskHistory.getFetcherHistories();
+        List<FetcherHistoryProto> fetcherHistories = 
taskHistory.getFetcherHistories();
         if (fetcherHistories.size() > 0) {
 
     %>
@@ -128,7 +115,7 @@
             <th># Messages</th>
         </tr>
         <%
-            for (TajoWorkerProtocol.FetcherHistoryProto eachFetcher : 
fetcherHistories) {
+            for (FetcherHistoryProto eachFetcher : fetcherHistories) {
         %>
         <tr>
             <td><%=index%>

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp 
b/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp
index b7774e8..5dd52d3 100644
--- a/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp
@@ -19,7 +19,7 @@
 %>
 <%@ page language="java" contentType="text/html; charset=UTF-8" 
pageEncoding="UTF-8"%>
 
-<%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %>
+<%@ page import="org.apache.tajo.ResourceProtos.FetcherHistoryProto" %>
 <%@ page import="org.apache.tajo.util.JSPUtil" %>
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
 <%@ page import="org.apache.tajo.worker.*" %>
@@ -86,13 +86,13 @@
     int index = 1;
     int pageSize = 1000; //TODO pagination
 
-    List<TajoWorkerProtocol.FetcherHistoryProto> fetcherHistories = 
taskHistory.getFetcherHistories();
+    List<FetcherHistoryProto> fetcherHistories = 
taskHistory.getFetcherHistories();
     if (fetcherHistories.size() > 0) {
 %>
   <table border="1" width="100%" class="border_table">
   
<tr><th>No</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th><th>File
 Length</th><th># Messages</th></tr>
 <%
-      for (TajoWorkerProtocol.FetcherHistoryProto eachFetcher : 
fetcherHistories) {
+      for (FetcherHistoryProto eachFetcher : fetcherHistories) {
 %>
     <tr>
       <td><%=index%></td>

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/resources/webapps/worker/tasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/tasks.jsp 
b/tajo-core/src/main/resources/webapps/worker/tasks.jsp
deleted file mode 100644
index ab873cd..0000000
--- a/tajo-core/src/main/resources/webapps/worker/tasks.jsp
+++ /dev/null
@@ -1,107 +0,0 @@
-<%
-    /*
-    * Licensed to the Apache Software Foundation (ASF) under one
-    * or more contributor license agreements. See the NOTICE file
-    * distributed with this work for additional information
-    * regarding copyright ownership. The ASF licenses this file
-    * to you under the Apache License, Version 2.0 (the
-    * "License"); you may not use this file except in compliance
-    * with the License. You may obtain a copy of the License at
-    *
-    * http://www.apache.org/licenses/LICENSE-2.0
-    *
-    * Unless required by applicable law or agreed to in writing, software
-    * distributed under the License is distributed on an "AS IS" BASIS,
-    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    * See the License for the specific language governing permissions and
-    * limitations under the License.
-    */
-%>
-<%@ page language="java" contentType="text/html; charset=UTF-8" 
pageEncoding="UTF-8"%>
-
-<%@ page import="org.apache.tajo.TaskAttemptId" %>
-<%@ page import="org.apache.tajo.util.JSPUtil" %>
-<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="java.text.SimpleDateFormat" %>
-<%@ page import="java.util.Map" %>
-<%@ page import="org.apache.tajo.worker.*" %>
-
-<%
-    String containerId = request.getParameter("taskRunnerId");
-    TajoWorker tajoWorker = (TajoWorker) 
StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
-
-    TaskRunner taskRunner = 
tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskRunner(containerId);
-    org.apache.tajo.worker.TaskRunnerHistory history = 
tajoWorker.getWorkerContext().getTaskRunnerManager().getExcutionBlockHistoryByTaskRunnerId(containerId);
-    SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-%>
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" 
"http://www.w3.org/TR/html4/loose.dtd";>
-<html>
-<head>
-    <link rel="stylesheet" type="text/css" href="/static/style.css"/>
-    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
-    <title>tajo worker</title>
-    <%
-        if (taskRunner == null && history == null) {
-    %>
-    <script type="text/javascript">
-        alert("No Task Container for" + containerId);
-        document.history.back();
-    </script>
-</head>
-</body>
-</html>
-<%
-        return;
-    }
-%>
-</head>
-<body>
-<%@ include file="header.jsp"%>
-<div class='contents'>
-    <h2>Tajo Worker: <a 
href='index.jsp'><%=tajoWorker.getWorkerContext().getWorkerName()%></a></h2>
-    <hr/>
-    <h3>Tasks</h3>
-    <table width="100%" border="1" class="border_table">
-        
<tr><th>Id</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th></tr>
-        <%
-            if (taskRunner != null) {
-                ExecutionBlockContext context = taskRunner.getContext();
-
-                for (Map.Entry<TaskAttemptId, Task> entry : 
context.getTasks().entrySet()) {
-                    TaskAttemptId taskAttemptId = entry.getKey();
-                    TaskHistory eachTask = 
entry.getValue().createTaskHistory();
-        %>
-                    <tr>
-                        <td>
-                            <a 
href="taskdetail.jsp?containerId=<%=containerId%>&taskAttemptId=<%=taskAttemptId%>"><%=taskAttemptId%></a></td>
-                        <td><%=df.format(eachTask.getStartTime())%></td>
-                        <td><%=eachTask.getFinishTime() == 0 ? "-" : 
df.format(eachTask.getFinishTime())%></td>
-                        <td><%=JSPUtil.getElapsedTime(eachTask.getStartTime(), 
eachTask.getFinishTime())%></td>
-                        <td><%=eachTask.getState()%></td>
-                    </tr>
-        <%
-                }
-            }
-
-            if (history != null) {
-
-
-                for (Map.Entry<TaskAttemptId, TaskHistory> entry : 
history.getTaskHistoryMap().entrySet()) {
-                    TaskAttemptId taskAttemptId = entry.getKey();
-                    TaskHistory eachTask = entry.getValue();
-        %>
-                        <tr>
-                            <td><a 
href="taskdetail.jsp?containerId=<%=containerId%>&taskAttemptId=<%=taskAttemptId%>"><%=taskAttemptId%></a></td>
-                            <td><%=df.format(eachTask.getStartTime())%></td>
-                            <td><%=eachTask.getFinishTime() == 0 ? "-" : 
df.format(eachTask.getFinishTime())%></td>
-                            
<td><%=JSPUtil.getElapsedTime(eachTask.getStartTime(), 
eachTask.getFinishTime())%></td>
-                            <td><%=eachTask.getState()%></td>
-                        </tr>
-        <%
-                }
-            }
-        %>
-    </table>
-</div>
-</body>
-</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java 
b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index a323f25..995d448 100644
--- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -231,7 +231,7 @@ public class QueryTestCaseBase {
     /* protect a travis stalled build */
     System.out.println("Run: " + name.getMethodName() +
         " Used memory: " + ((Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory())
-        / (1024 * 1024)) + "MBytes");
+        / (1024 * 1024)) + " MBytes, Active Threads:" + Thread.activeCount());
   }
 
   public QueryTestCaseBase() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java 
b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 973f1e8..7be0cab 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -19,7 +19,6 @@
 package org.apache.tajo;
 
 import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
 import com.google.common.io.Closeables;
 import com.google.common.io.Files;
 import org.apache.commons.lang.StringUtils;
@@ -40,7 +39,6 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import 
org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider;
 import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.plan.rewrite.LogicalPlanTestRuleProvider;
 import org.apache.tajo.querymaster.Query;
 import org.apache.tajo.querymaster.QueryMasterTask;
@@ -128,16 +126,10 @@ public class TajoTestingCluster {
     conf.setClassVar(ConfVars.LOGICAL_PLAN_REWRITE_RULE_PROVIDER_CLASS, 
LogicalPlanTestRuleProvider.class);
     conf.setClassVar(ConfVars.GLOBAL_PLAN_REWRITE_RULE_PROVIDER_CLASS, 
GlobalPlanTestRuleProvider.class);
 
-
-    // default resource manager
-    if (System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname) != null) {
-      String testResourceManager = 
System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname);
-      
Preconditions.checkState(testResourceManager.equals(TajoWorkerResourceManager.class.getCanonicalName()));
-      conf.set(ConfVars.RESOURCE_MANAGER_CLASS.varname, 
System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname));
-    }
-    conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 2048);
-    conf.setFloat(ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.varname, 2.0f);
-
+    conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES.varname, 4);
+    conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 2000);
+    conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM.varname, 
3);
+    conf.setInt(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM.varname, 
2);
 
     // Client API RPC
     conf.setIntVar(ConfVars.RPC_CLIENT_WORKER_THREAD_NUM, 2);
@@ -145,6 +137,7 @@ public class TajoTestingCluster {
     //Client API service RPC Server
     conf.setIntVar(ConfVars.MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2);
     conf.setIntVar(ConfVars.WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2);
+    conf.setIntVar(ConfVars.REST_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2);
 
     // Internal RPC Client
     conf.setIntVar(ConfVars.INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM, 2);
@@ -157,10 +150,6 @@ public class TajoTestingCluster {
     conf.setIntVar(ConfVars.CATALOG_RPC_SERVER_WORKER_THREAD_NUM, 2);
     conf.setIntVar(ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM, 2);
 
-    // Resource allocator
-    conf.setIntVar(ConfVars.$QUERY_EXECUTE_PARALLEL_MAX, 3);
-    conf.setIntVar(ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM, 6);   // 
make twice of parallel_max
-
     // Memory cache termination
     conf.setIntVar(ConfVars.WORKER_HISTORY_EXPIRE_PERIOD, 1);
 
@@ -512,7 +501,8 @@ public class TajoTestingCluster {
     startMiniDFSCluster(numDataNodes, clusterTestBuildDir, dataNodeHosts);
     this.dfsCluster.waitClusterUp();
 
-    conf.setInt("hbase.hconnection.threads.core", 50);
+    conf.setInt("hbase.hconnection.threads.core", 5);
+    conf.setInt("hbase.hconnection.threads.max", 50);
     hbaseUtil = new HBaseTestClusterUtil(conf, clusterTestBuildDir);
 
     startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, false);

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index ace3d0d..d198326 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -50,7 +50,7 @@ import java.io.IOException;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
index dc4dd04..3c9177e 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
@@ -50,7 +50,7 @@ import java.io.IOException;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm;
 import static org.junit.Assert.*;
 
 public class TestFullOuterHashJoinExec {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
index 8fd61d0..6b32db0 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
@@ -52,7 +52,7 @@ import java.io.IOException;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm;
 import static org.junit.Assert.*;
 
 public class TestFullOuterMergeJoinExec {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index b9ee06a..f7bc1dd 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -51,7 +51,7 @@ import org.junit.Test;
 import java.io.IOException;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm;
 import static org.junit.Assert.*;
 
 public class TestHashJoinExec {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
index c93a1b4..87bcd20 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
@@ -50,7 +50,7 @@ import java.io.IOException;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index c4e7752..d4189bb 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -52,7 +52,7 @@ import org.junit.Test;
 import java.io.IOException;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index dff0cbe..ca4b6b7 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -69,8 +69,8 @@ import java.util.Set;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
-import static 
org.apache.tajo.ipc.TajoWorkerProtocol.ColumnPartitionEnforcer.ColumnPartitionAlgorithm;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce.SortAlgorithm;
+import static 
org.apache.tajo.plan.serder.PlanProto.ColumnPartitionEnforcer.ColumnPartitionAlgorithm;
+import static org.apache.tajo.plan.serder.PlanProto.SortEnforce.SortAlgorithm;
 import static org.junit.Assert.*;
 
 public class TestPhysicalPlanner {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
index d86b229..51f0e76 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
@@ -50,7 +50,7 @@ import java.io.IOException;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm;
 import static org.junit.Assert.*;
 
 public class TestRightOuterMergeJoinExec {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
 
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
index da0f59d..d769c13 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java
@@ -46,6 +46,7 @@ import static org.junit.Assert.assertTrue;
 @Category(IntegrationTest.class)
 @RunWith(Parameterized.class)
 @NamedTest("TestJoinQuery")
[email protected]
 public class TestJoinOnPartitionedTables extends TestJoinQuery {
 
   public TestJoinOnPartitionedTables(String joinOption) throws Exception {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/engine/query/TestOuterJoinQuery.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestOuterJoinQuery.java 
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestOuterJoinQuery.java
index 53dce3c..18eba41 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestOuterJoinQuery.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestOuterJoinQuery.java
@@ -28,8 +28,6 @@ import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.sql.ResultSet;
-
 @Category(IntegrationTest.class)
 @RunWith(Parameterized.class)
 @NamedTest("TestJoinQuery")

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
 
b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
index e8d59d0..207f64d 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
@@ -18,49 +18,9 @@
 
 package org.apache.tajo.master;
 
-import static org.junit.Assert.*;
-import static org.hamcrest.CoreMatchers.*;
-
-import java.io.File;
-import java.sql.ResultSet;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.tajo.*;
-import org.apache.tajo.algebra.Expr;
-import org.apache.tajo.benchmark.TPCH;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.client.ResultSetUtil;
-import org.apache.tajo.client.TajoClient;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
-import org.apache.tajo.master.exec.NonForwardQueryResultSystemScanner;
-import org.apache.tajo.plan.LogicalOptimizer;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.LogicalPlanner;
-import org.apache.tajo.plan.logical.LimitNode;
-import org.apache.tajo.plan.logical.NodeType;
-import org.apache.tajo.storage.RowStoreUtil;
-import org.apache.tajo.storage.StorageConstants;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.util.KeyValueSet;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeDiagnosingMatcher;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.apache.tajo.QueryTestCaseBase;
 import org.junit.Test;
 
-import com.google.protobuf.ByteString;
-
 public class TestNonForwardQueryResultSystemScanner extends QueryTestCaseBase {
   @Test
   public void testGetNextRowsForAggregateFunction() throws Exception {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java 
b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index 9910d79..3c378dd 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -20,29 +20,28 @@ package org.apache.tajo.master;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import io.netty.handler.codec.http.QueryStringDecoder;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.ResourceProtos.FetchProto;
 import org.apache.tajo.TestTajoIds;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.querymaster.Repartitioner;
 import org.apache.tajo.querymaster.Task;
 import org.apache.tajo.querymaster.Task.IntermediateEntry;
-import org.apache.tajo.querymaster.Repartitioner;
 import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.FetchImpl;
 import org.junit.Test;
 
-import io.netty.handler.codec.http.QueryStringDecoder;
-
 import java.net.URI;
 import java.util.*;
 
 import static junit.framework.Assert.assertEquals;
-import static org.apache.tajo.querymaster.Repartitioner.FetchGroupMeta;
 import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
 import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.HASH_SHUFFLE;
 import static 
org.apache.tajo.plan.serder.PlanProto.ShuffleType.SCATTERED_HASH_SHUFFLE;
+import static org.apache.tajo.querymaster.Repartitioner.FetchGroupMeta;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -76,7 +75,7 @@ public class TestRepartitioner {
 
       fetch.setName(sid.toString());
 
-      TajoWorkerProtocol.FetchProto proto = fetch.getProto();
+      FetchProto proto = fetch.getProto();
       fetch = new FetchImpl(proto);
       assertEquals(proto, fetch.getProto());
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
 
b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
deleted file mode 100644
index 2c997a3..0000000
--- 
a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
+++ /dev/null
@@ -1,454 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.rm;
-
-import com.google.protobuf.RpcCallback;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
-import static org.junit.Assert.*;
-
-public class TestTajoResourceManager {
-  private final PrimitiveProtos.BoolProto BOOL_TRUE = 
PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
-  private final PrimitiveProtos.BoolProto BOOL_FALSE = 
PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
-
-  TajoConf tajoConf;
-
-  long queryIdTime = System.currentTimeMillis();
-  int numWorkers = 5;
-  float workerDiskSlots = 5.0f;
-  int workerMemoryMB = 512 * 10;
-  WorkerResourceAllocationResponse response;
-
-  private TajoWorkerResourceManager initResourceManager() throws Exception {
-    tajoConf = new org.apache.tajo.conf.TajoConf();
-
-    tajoConf.setFloatVar(TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT, 0.0f);
-    tajoConf.setIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB, 512);
-    tajoConf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, 
"localhost:0");
-    TajoWorkerResourceManager tajoWorkerResourceManager = new 
TajoWorkerResourceManager(tajoConf);
-    tajoWorkerResourceManager.init(tajoConf);
-    tajoWorkerResourceManager.start();
-
-    for(int i = 0; i < numWorkers; i++) {
-      ServerStatusProto.System system = ServerStatusProto.System.newBuilder()
-          .setAvailableProcessors(1)
-          .setFreeMemoryMB(workerMemoryMB)
-          .setMaxMemoryMB(workerMemoryMB)
-          .setTotalMemoryMB(workerMemoryMB)
-          .build();
-
-      ServerStatusProto.JvmHeap jvmHeap = 
ServerStatusProto.JvmHeap.newBuilder()
-          .setFreeHeap(workerMemoryMB)
-          .setMaxHeap(workerMemoryMB)
-          .setTotalHeap(workerMemoryMB)
-          .build();
-
-      ServerStatusProto.Disk disk = ServerStatusProto.Disk.newBuilder()
-          .setAbsolutePath("/")
-          .setFreeSpace(0)
-          .setTotalSpace(0)
-          .setUsableSpace(0)
-          .build();
-
-      List<ServerStatusProto.Disk> disks = new 
ArrayList<ServerStatusProto.Disk>();
-
-      disks.add(disk);
-
-      ServerStatusProto serverStatus = ServerStatusProto.newBuilder()
-          .setDiskSlots(workerDiskSlots)
-          .setMemoryResourceMB(workerMemoryMB)
-          .setJvmHeap(jvmHeap)
-          .setSystem(system)
-          .addAllDisk(disks)
-          .setRunningTaskNum(0)
-          .build();
-
-      WorkerConnectionInfo connectionInfo =
-          new WorkerConnectionInfo("host" + (i + 1), 28091, 28092, 21000 + i, 
28093, 28080);
-      NodeHeartbeat tajoHeartbeat = NodeHeartbeat.newBuilder()
-          .setConnectionInfo(connectionInfo.getProto())
-          .setServerStatus(serverStatus)
-          .build();
-
-      tajoWorkerResourceManager.getResourceTracker().heartbeat(null, 
tajoHeartbeat, NullCallback.get());
-    }
-
-    return tajoWorkerResourceManager;
-  }
-
-
-  @Test
-  public void testHeartbeat() throws Exception {
-    TajoWorkerResourceManager tajoWorkerResourceManager = null;
-    try {
-      tajoWorkerResourceManager = initResourceManager();
-      assertEquals(numWorkers, tajoWorkerResourceManager.getWorkers().size());
-      for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
-        WorkerResource resource = worker.getResource();
-        assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
-        assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
-      }
-    } finally {
-      if (tajoWorkerResourceManager != null) {
-        tajoWorkerResourceManager.stop();
-      }
-    }
-  }
-
-  @Test
-  public void testMemoryResource() throws Exception {
-    TajoWorkerResourceManager tajoWorkerResourceManager = null;
-    try {
-      tajoWorkerResourceManager = initResourceManager();
-
-      final int minMemory = 256;
-      final int maxMemory = 512;
-      float diskSlots = 1.0f;
-
-      QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 1);
-
-      WorkerResourceAllocationRequest request = 
WorkerResourceAllocationRequest.newBuilder()
-          .setResourceRequestPriority(ResourceRequestPriority.MEMORY)
-          .setNumContainers(60)
-          .setQueryId(queryId.getProto())
-          .setMaxDiskSlotPerContainer(diskSlots)
-          .setMinDiskSlotPerContainer(diskSlots)
-          .setMinMemoryMBPerContainer(minMemory)
-          .setMaxMemoryMBPerContainer(maxMemory)
-          .build();
-
-      final CountDownLatch barrier = new CountDownLatch(1);
-      final List<ContainerProtocol.TajoContainerIdProto> containerIds = new
-        ArrayList<ContainerProtocol.TajoContainerIdProto>();
-
-      RpcCallback<WorkerResourceAllocationResponse> callBack = new 
RpcCallback<WorkerResourceAllocationResponse>() {
-
-        @Override
-        public void run(WorkerResourceAllocationResponse response) {
-          TestTajoResourceManager.this.response = response;
-          barrier.countDown();
-        }
-      };
-
-      tajoWorkerResourceManager.allocateWorkerResources(request, callBack);
-      assertTrue(barrier.await(3, TimeUnit.SECONDS));
-
-
-      // assert after callback
-      int totalUsedMemory = 0;
-      int totalUsedDisks = 0;
-
-      for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
-        WorkerResource resource = worker.getResource();
-        assertEquals(0, resource.getAvailableMemoryMB());
-        assertEquals(0, resource.getAvailableDiskSlots(), 0);
-        assertEquals(5.0f, resource.getUsedDiskSlots(), 0);
-
-        totalUsedMemory += resource.getUsedMemoryMB();
-        totalUsedDisks += resource.getUsedDiskSlots();
-      }
-
-      assertEquals(workerMemoryMB * numWorkers, totalUsedMemory);
-      assertEquals(workerDiskSlots * numWorkers, totalUsedDisks, 0);
-
-      assertEquals(numWorkers * 10, 
response.getWorkerAllocatedResourceList().size());
-
-      for(WorkerAllocatedResource eachResource: 
response.getWorkerAllocatedResourceList()) {
-        assertTrue(
-            eachResource.getAllocatedMemoryMB() >= minMemory &&  
eachResource.getAllocatedMemoryMB() <= maxMemory);
-        containerIds.add(eachResource.getContainerId());
-      }
-
-      for(ContainerProtocol.TajoContainerIdProto eachContainerId: 
containerIds) {
-        tajoWorkerResourceManager.releaseWorkerResource(eachContainerId);
-      }
-
-      for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
-        WorkerResource resource = worker.getResource();
-        assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
-        assertEquals(0, resource.getUsedMemoryMB());
-
-        assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
-        assertEquals(0.0f, resource.getUsedDiskSlots(), 0);
-      }
-    } finally {
-      if (tajoWorkerResourceManager != null) {
-        tajoWorkerResourceManager.stop();
-      }
-    }
-  }
-
-  @Test
-  public void testMemoryNotCommensurable() throws Exception {
-    TajoWorkerResourceManager tajoWorkerResourceManager = null;
-
-    try {
-      tajoWorkerResourceManager = initResourceManager();
-
-      final int minMemory = 200;
-      final int maxMemory = 500;
-      float diskSlots = 1.0f;
-
-      QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 2);
-
-      int requiredContainers = 60;
-
-      int numAllocatedContainers = 0;
-
-      int loopCount = 0;
-      while(true) {
-        WorkerResourceAllocationRequest request = 
WorkerResourceAllocationRequest.newBuilder()
-            .setResourceRequestPriority(ResourceRequestPriority.MEMORY)
-            .setNumContainers(requiredContainers - numAllocatedContainers)
-            .setQueryId(queryId.getProto())
-            .setMaxDiskSlotPerContainer(diskSlots)
-            .setMinDiskSlotPerContainer(diskSlots)
-            .setMinMemoryMBPerContainer(minMemory)
-            .setMaxMemoryMBPerContainer(maxMemory)
-            .build();
-
-        final CountDownLatch barrier = new CountDownLatch(1);
-
-        RpcCallback<WorkerResourceAllocationResponse> callBack = new 
RpcCallback<WorkerResourceAllocationResponse>() {
-          @Override
-          public void run(WorkerResourceAllocationResponse response) {
-            TestTajoResourceManager.this.response = response;
-            barrier.countDown();
-          }
-        };
-
-        tajoWorkerResourceManager.allocateWorkerResources(request, callBack);
-
-        assertTrue(barrier.await(3, TimeUnit.SECONDS));
-
-        numAllocatedContainers += 
TestTajoResourceManager.this.response.getWorkerAllocatedResourceList().size();
-
-        //release resource
-        for(WorkerAllocatedResource eachResource:
-            
TestTajoResourceManager.this.response.getWorkerAllocatedResourceList()) {
-          assertTrue(
-              eachResource.getAllocatedMemoryMB() >= minMemory &&  
eachResource.getAllocatedMemoryMB() <= maxMemory);
-          
tajoWorkerResourceManager.releaseWorkerResource(eachResource.getContainerId());
-        }
-
-        for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
-          WorkerResource resource = worker.getResource();
-          assertEquals(0, resource.getUsedMemoryMB());
-          assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
-
-          assertEquals(0.0f, resource.getUsedDiskSlots(), 0);
-          assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
-        }
-
-        loopCount++;
-
-        if(loopCount == 2) {
-          assertEquals(requiredContainers, numAllocatedContainers);
-          break;
-        }
-      }
-
-      for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
-        WorkerResource resource = worker.getResource();
-        assertEquals(0, resource.getUsedMemoryMB());
-        assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
-
-        assertEquals(0.0f, resource.getUsedDiskSlots(), 0);
-        assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
-      }
-    } finally {
-      if (tajoWorkerResourceManager != null) {
-        tajoWorkerResourceManager.stop();
-      }
-    }
-  }
-
-  @Test
-  public void testDiskResource() throws Exception {
-    TajoWorkerResourceManager tajoWorkerResourceManager = null;
-
-    try {
-      tajoWorkerResourceManager = initResourceManager();
-
-      final float minDiskSlots = 1.0f;
-      final float maxDiskSlots = 2.0f;
-      int memoryMB = 256;
-
-      QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 3);
-
-      WorkerResourceAllocationRequest request = 
WorkerResourceAllocationRequest.newBuilder()
-          .setResourceRequestPriority(ResourceRequestPriority.DISK)
-          .setNumContainers(60)
-          .setQueryId(queryId.getProto())
-          .setMaxDiskSlotPerContainer(maxDiskSlots)
-          .setMinDiskSlotPerContainer(minDiskSlots)
-          .setMinMemoryMBPerContainer(memoryMB)
-          .setMaxMemoryMBPerContainer(memoryMB)
-          .build();
-
-      final CountDownLatch barrier = new CountDownLatch(1);
-      final List<ContainerProtocol.TajoContainerIdProto> containerIds = new
-        ArrayList<ContainerProtocol.TajoContainerIdProto>();
-
-
-      RpcCallback<WorkerResourceAllocationResponse> callBack = new 
RpcCallback<WorkerResourceAllocationResponse>() {
-
-        @Override
-        public void run(WorkerResourceAllocationResponse response) {
-          TestTajoResourceManager.this.response = response;
-          barrier.countDown();
-        }
-      };
-
-      tajoWorkerResourceManager.allocateWorkerResources(request, callBack);
-      assertTrue(barrier.await(3, TimeUnit.SECONDS));
-
-      for(WorkerAllocatedResource eachResource: 
response.getWorkerAllocatedResourceList()) {
-        assertTrue("AllocatedDiskSlot:" + eachResource.getAllocatedDiskSlots(),
-            eachResource.getAllocatedDiskSlots() >= minDiskSlots &&
-                eachResource.getAllocatedDiskSlots() <= maxDiskSlots);
-        containerIds.add(eachResource.getContainerId());
-      }
-
-      // assert after callback
-      int totalUsedDisks = 0;
-      for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
-        WorkerResource resource = worker.getResource();
-        //each worker allocated 3 container (2 disk slot = 2, 1 disk slot = 1)
-        assertEquals(0, resource.getAvailableDiskSlots(), 0);
-        assertEquals(5.0f, resource.getUsedDiskSlots(), 0);
-        assertEquals(256 * 3, resource.getUsedMemoryMB());
-
-        totalUsedDisks += resource.getUsedDiskSlots();
-      }
-
-      assertEquals(workerDiskSlots * numWorkers, totalUsedDisks, 0);
-
-      assertEquals(numWorkers * 3, 
response.getWorkerAllocatedResourceList().size());
-
-      for(ContainerProtocol.TajoContainerIdProto eachContainerId: 
containerIds) {
-        tajoWorkerResourceManager.releaseWorkerResource(eachContainerId);
-      }
-
-      for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
-        WorkerResource resource = worker.getResource();
-        assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
-        assertEquals(0, resource.getUsedMemoryMB());
-
-        assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
-        assertEquals(0.0f, resource.getUsedDiskSlots(), 0);
-      }
-    } finally {
-      if (tajoWorkerResourceManager != null) {
-        tajoWorkerResourceManager.stop();
-      }
-    }
-  }
-
-  @Test
-  public void testDiskResourceWithStoppedQuery() throws Exception {
-    TajoWorkerResourceManager tajoWorkerResourceManager = null;
-
-    try {
-      tajoWorkerResourceManager = initResourceManager();
-
-      final float minDiskSlots = 1.0f;
-      final float maxDiskSlots = 2.0f;
-      int memoryMB = 256;
-
-      QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 3);
-
-      WorkerResourceAllocationRequest request = 
WorkerResourceAllocationRequest.newBuilder()
-          .setResourceRequestPriority(ResourceRequestPriority.DISK)
-          .setNumContainers(60)
-          .setQueryId(queryId.getProto())
-          .setMaxDiskSlotPerContainer(maxDiskSlots)
-          .setMinDiskSlotPerContainer(minDiskSlots)
-          .setMinMemoryMBPerContainer(memoryMB)
-          .setMaxMemoryMBPerContainer(memoryMB)
-          .build();
-
-      final CountDownLatch barrier = new CountDownLatch(1);
-      final List<ContainerProtocol.TajoContainerIdProto> containerIds = new
-        ArrayList<ContainerProtocol.TajoContainerIdProto>();
-
-
-      RpcCallback<WorkerResourceAllocationResponse> callBack = new 
RpcCallback<WorkerResourceAllocationResponse>() {
-
-        @Override
-        public void run(WorkerResourceAllocationResponse response) {
-          TestTajoResourceManager.this.response = response;
-          barrier.countDown();
-        }
-      };
-
-      
tajoWorkerResourceManager.getRMContext().getStoppedQueryIds().add(queryId);
-      tajoWorkerResourceManager.allocateWorkerResources(request, callBack);
-      assertFalse(barrier.await(3, TimeUnit.SECONDS));
-
-      assertNull(response);
-
-      // assert after callback
-      int totalUsedDisks = 0;
-      for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
-        WorkerResource resource = worker.getResource();
-        //each worker allocated 3 container (2 disk slot = 2, 1 disk slot = 1)
-        assertEquals(5.0f, resource.getAvailableDiskSlots(), 0);
-        assertEquals(0, resource.getUsedDiskSlots(), 0);
-        assertEquals(0, resource.getUsedMemoryMB());
-
-        totalUsedDisks += resource.getUsedDiskSlots();
-      }
-
-      assertEquals(0, totalUsedDisks, 0);
-
-      for(ContainerProtocol.TajoContainerIdProto eachContainerId: 
containerIds) {
-        tajoWorkerResourceManager.releaseWorkerResource(eachContainerId);
-      }
-
-      for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
-        WorkerResource resource = worker.getResource();
-        assertEquals(workerMemoryMB, resource.getAvailableMemoryMB());
-        assertEquals(0, resource.getUsedMemoryMB());
-
-        assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0);
-        assertEquals(0.0f, resource.getUsedDiskSlots(), 0);
-      }
-    } finally {
-      if (tajoWorkerResourceManager != null) {
-        tajoWorkerResourceManager.stop();
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java
 
b/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java
deleted file mode 100644
index 874461f..0000000
--- 
a/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.scheduler;
-
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.benchmark.TPCH;
-import org.apache.tajo.client.TajoClient;
-import org.apache.tajo.client.TajoClientUtil;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.ClientProtos;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.File;
-import java.sql.ResultSet;
-
-import static org.junit.Assert.*;
-
-public class TestFifoScheduler {
-  private static TajoTestingCluster cluster;
-  private static TajoConf conf;
-  private static TajoClient client;
-  private static String query =
-      "select l_orderkey, l_partkey from lineitem group by l_orderkey, 
l_partkey order by l_orderkey";
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    cluster = new TajoTestingCluster();
-    cluster.startMiniClusterInLocal(1);
-    conf = cluster.getConfiguration();
-    client = cluster.newTajoClient();
-    File file = TPCH.getDataFile("lineitem");
-    client.executeQueryAndGetResult("create external table default.lineitem 
(l_orderkey int, l_partkey int) "
-        + "using text location 'file://" + file.getAbsolutePath() + "'");
-    assertTrue(client.existTable("default.lineitem"));
-  }
-
-  @AfterClass
-  public static void tearDown() throws Exception {
-    if (client != null) client.close();
-    if (cluster != null) cluster.shutdownMiniCluster();
-  }
-
-  @Test
-  public final void testKillScheduledQuery() throws Exception {
-    ClientProtos.SubmitQueryResponse res = client.executeQuery(query);
-    ClientProtos.SubmitQueryResponse res2 = client.executeQuery(query);
-    QueryId queryId = new QueryId(res.getQueryId());
-    QueryId queryId2 = new QueryId(res2.getQueryId());
-
-    cluster.waitForQuerySubmitted(queryId);
-    client.killQuery(queryId2);
-    assertEquals(TajoProtos.QueryState.QUERY_KILLED, 
client.getQueryStatus(queryId2).getState());
-  }
-
-  @Test
-  public final void testForwardedQuery() throws Exception {
-    ClientProtos.SubmitQueryResponse res = client.executeQuery(query);
-    ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select * from 
lineitem limit 1");
-    assertTrue(res.getIsForwarded());
-    assertFalse(res2.getIsForwarded());
-
-    QueryId queryId = new QueryId(res.getQueryId());
-    QueryId queryId2 = new QueryId(res2.getQueryId());
-    cluster.waitForQuerySubmitted(queryId);
-
-    assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, 
client.getQueryStatus(queryId2).getState());
-    ResultSet resSet = TajoClientUtil.createResultSet(client, res2, 1);
-    assertNotNull(resSet);
-  }
-
-  @Test
-  public final void testScheduledQuery() throws Exception {
-    ClientProtos.SubmitQueryResponse res = client.executeQuery("select 
sleep(1) from lineitem");
-    ClientProtos.SubmitQueryResponse res2 = client.executeQuery(query);
-    ClientProtos.SubmitQueryResponse res3 = client.executeQuery(query);
-    ClientProtos.SubmitQueryResponse res4 = client.executeQuery(query);
-
-    QueryId queryId = new QueryId(res.getQueryId());
-    QueryId queryId2 = new QueryId(res2.getQueryId());
-    QueryId queryId3 = new QueryId(res3.getQueryId());
-    QueryId queryId4 = new QueryId(res4.getQueryId());
-
-    cluster.waitForQuerySubmitted(queryId);
-
-    
assertFalse(TajoClientUtil.isQueryComplete(client.getQueryStatus(queryId).getState()));
-
-    assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, 
client.getQueryStatus(queryId2).getState());
-    assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, 
client.getQueryStatus(queryId3).getState());
-    assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, 
client.getQueryStatus(queryId4).getState());
-
-    client.killQuery(queryId4);
-    client.killQuery(queryId3);
-    client.killQuery(queryId2);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java
 
b/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java
new file mode 100644
index 0000000..36fd939
--- /dev/null
+++ 
b/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.scheduler;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.QueryInfo;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.rm.*;
+import org.apache.tajo.master.scheduler.event.ResourceReserveSchedulerEvent;
+import org.apache.tajo.master.scheduler.event.SchedulerEvent;
+import org.apache.tajo.master.scheduler.event.SchedulerEventType;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.rpc.CallFuture;
+import org.junit.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+import static org.apache.tajo.ResourceProtos.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@NotThreadSafe
+public class TestSimpleScheduler {
+  private CompositeService service;
+  private SimpleScheduler scheduler;
+  private TajoRMContext rmContext;
+  private AsyncDispatcher dispatcher;
+  private TajoConf conf;
+  private int workerNum = 3;
+  private NodeResource nodeResource;
+  private NodeResource totalResource;
+  private Semaphore barrier;
+  private int testDelay = 50;
+  private static ScheduledExecutorService executorService;
+
+  @BeforeClass
+  public static void setupClass() {
+    executorService = Executors.newScheduledThreadPool(10);
+  }
+
+  @AfterClass
+  public static void tearDownClass() {
+    executorService.shutdown();
+  }
+
+  @Before
+  public void setup() {
+    conf = new TajoConf();
+    nodeResource = NodeResource.createResource(1500, 2, 3);
+    service = new CompositeService(TestSimpleScheduler.class.getSimpleName()) {
+
+      @Override
+      protected void serviceInit(Configuration conf) throws Exception {
+        dispatcher = new AsyncDispatcher();
+        addService(dispatcher);
+
+        rmContext = new TajoRMContext(dispatcher);
+        rmContext.getDispatcher().register(NodeEventType.class,
+            new TajoResourceManager.WorkerEventDispatcher(rmContext));
+
+        barrier = new Semaphore(0);
+        scheduler = new MySimpleScheduler(rmContext, barrier);
+        addService(scheduler);
+        rmContext.getDispatcher().register(SchedulerEventType.class, 
scheduler);
+
+        for (int i = 0; i < workerNum; i++) {
+          WorkerConnectionInfo conn = new WorkerConnectionInfo("host" + i, 
28091 + i, 28092, 21000, 28093, 28080);
+          rmContext.getNodes().putIfAbsent(conn.getId(),
+              new NodeStatus(rmContext, NodeResources.clone(nodeResource), 
conn));
+          rmContext.getDispatcher().getEventHandler().handle(new 
NodeEvent(conn.getId(), NodeEventType.STARTED));
+        }
+        super.serviceInit(conf);
+      }
+    };
+    service.init(conf);
+    service.start();
+
+    assertEquals(workerNum, rmContext.getNodes().size());
+    totalResource = NodeResources.createResource(0);
+    for(NodeStatus nodeStatus : rmContext.getNodes().values()) {
+      NodeResources.addTo(totalResource, 
nodeStatus.getTotalResourceCapability());
+    }
+  }
+
+  @After
+  public void tearDown() {
+    service.stop();
+  }
+
+  @Test
+  public void testInitialCapacity() throws InterruptedException {
+    assertEquals(workerNum, scheduler.getNumClusterNodes());
+    assertEquals(0, scheduler.getRunningQuery());
+
+    assertEquals(totalResource, scheduler.getMaximumResourceCapability());
+    assertEquals(totalResource, scheduler.getClusterResource());
+
+    assertEquals(TajoConf.ConfVars.QUERYMASTER_MINIMUM_MEMORY.defaultIntVal,
+        scheduler.getQMMinimumResourceCapability().getMemory());
+
+    assertEquals(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY.defaultIntVal,
+        scheduler.getMinimumResourceCapability().getMemory());
+  }
+
+  @Test(timeout = 10000)
+  public void testSubmitOneQuery() throws InterruptedException {
+    QuerySchedulingInfo schedulingInfo = new QuerySchedulingInfo("default",
+        "user",
+        QueryIdFactory.newQueryId(System.nanoTime(), 0),
+        1,
+        System.currentTimeMillis());
+
+    assertEquals(0, scheduler.getRunningQuery());
+
+    scheduler.submitQuery(schedulingInfo);
+    barrier.acquire();
+    assertEquals(1, scheduler.getRunningQuery());
+
+    assertEquals(totalResource, scheduler.getMaximumResourceCapability());
+    assertEquals(totalResource,
+        NodeResources.add(scheduler.getQMMinimumResourceCapability(), 
scheduler.getClusterResource()));
+  }
+
+  @Test(timeout = 10000)
+  public void testMaximumSubmitQuery() throws InterruptedException {
+    assertEquals(0, scheduler.getRunningQuery());
+    int maximumParallelQuery = 
scheduler.getResourceCalculator().computeAvailableContainers(
+        scheduler.getMaximumResourceCapability(), 
scheduler.getQMMinimumResourceCapability());
+
+    int testParallelNum = 10;
+    for (int i = 0; i < testParallelNum; i++) {
+      QuerySchedulingInfo schedulingInfo = new QuerySchedulingInfo("default",
+          "user",
+          QueryIdFactory.newQueryId(System.nanoTime(), 0),
+          1,
+          System.currentTimeMillis());
+      scheduler.submitQuery(schedulingInfo);
+    }
+
+    barrier.acquire();
+    // allow 50% parallel running
+    assertEquals(Math.floor(maximumParallelQuery * 0.5f), (double) 
scheduler.getRunningQuery(), 1.0f);
+    assertEquals(testParallelNum, scheduler.getRunningQuery() + 
scheduler.getQueryQueue().size());
+  }
+
+  @Test(timeout = 10000)
+  public void testReserveResource() throws InterruptedException, 
ExecutionException {
+    int requestNum = 3;
+    assertEquals(totalResource, scheduler.getMaximumResourceCapability());
+    assertEquals(totalResource, scheduler.getClusterResource());
+
+    QueryId queryId = QueryIdFactory.newQueryId(System.nanoTime(), 0);
+    CallFuture<NodeResourceResponse> callBack = new 
CallFuture<NodeResourceResponse>();
+    rmContext.getDispatcher().getEventHandler().handle(new 
ResourceReserveSchedulerEvent(
+        createResourceRequest(queryId, requestNum, new ArrayList<Integer>()), 
callBack));
+
+    NodeResourceResponse responseProto = callBack.get();
+    assertEquals(queryId, new QueryId(responseProto.getQueryId()));
+    assertEquals(requestNum, responseProto.getResourceCount());
+
+    NodeResource allocations = NodeResources.createResource(0);
+    for (AllocationResourceProto resourceProto : 
responseProto.getResourceList()) {
+      NodeResources.addTo(allocations, new 
NodeResource(resourceProto.getResource()));
+    }
+
+    assertEquals(NodeResources.subtract(totalResource, allocations), 
scheduler.getClusterResource());
+  }
+
+  @Test(timeout = 10000)
+  public void testReserveResourceWithWorkerPriority() throws 
InterruptedException, ExecutionException {
+    int requestNum = 2;
+    assertEquals(totalResource, scheduler.getMaximumResourceCapability());
+    assertEquals(totalResource, scheduler.getClusterResource());
+
+    List<Integer> targetWorkers = Lists.newArrayList();
+    Map.Entry<Integer, NodeStatus> workerEntry = 
rmContext.getNodes().entrySet().iterator().next();
+    targetWorkers.add(workerEntry.getKey());
+
+    NodeResource expectResource = 
NodeResources.multiply(scheduler.getMinimumResourceCapability(), requestNum);
+    assertTrue(NodeResources.fitsIn(expectResource, 
workerEntry.getValue().getAvailableResource()));
+
+    QueryId queryId = QueryIdFactory.newQueryId(System.nanoTime(), 0);
+    NodeResourceRequest requestProto = createResourceRequest(queryId, 
requestNum, targetWorkers);
+    CallFuture<NodeResourceResponse> callBack = new 
CallFuture<NodeResourceResponse>();
+    rmContext.getDispatcher().getEventHandler().handle(new 
ResourceReserveSchedulerEvent(
+        requestProto, callBack));
+
+    NodeResourceResponse responseProto = callBack.get();
+    assertEquals(queryId, new QueryId(responseProto.getQueryId()));
+    assertEquals(requestNum, responseProto.getResourceCount());
+
+    for (AllocationResourceProto resourceProto : 
responseProto.getResourceList()) {
+      assertEquals(workerEntry.getKey().intValue(), 
resourceProto.getWorkerId());
+    }
+  }
+
+  private NodeResourceRequest
+  createResourceRequest(QueryId queryId, int containerNum, List<Integer> 
candidateWorkers) {
+    NodeResourceRequest.Builder request =
+        NodeResourceRequest.newBuilder();
+    request.setCapacity(scheduler.getMinimumResourceCapability().getProto())
+        .setNumContainers(containerNum)
+        .setPriority(1)
+        .setQueryId(queryId.getProto())
+        .setType(ResourceType.LEAF)
+        .setUserId("test user")
+        .setRunningTasks(0)
+        .addAllCandidateNodes(candidateWorkers)
+        .setQueue("default");
+    return request.build();
+  }
+
+  class MySimpleScheduler extends SimpleScheduler {
+    Semaphore barrier;
+    Map<QueryId, QueryInfo> queryInfoMap = Maps.newHashMap();
+    Map<QueryId, AllocationResourceProto> qmAllocationMap = Maps.newHashMap();
+
+    public MySimpleScheduler(TajoRMContext rmContext, Semaphore barrier) {
+      super(null, rmContext);
+      this.barrier = barrier;
+    }
+
+    @Override
+    public void submitQuery(QuerySchedulingInfo schedulingInfo) {
+      queryInfoMap.put(schedulingInfo.getQueryId(), new 
QueryInfo(schedulingInfo.getQueryId()) {
+        QueryContext context;
+        @Override
+        public QueryContext getQueryContext() {
+          if(context == null) {
+            context = new QueryContext(conf);
+            context.setUser("user");
+          }
+          return context;
+        }
+      });
+      super.submitQuery(schedulingInfo);
+    }
+
+    @Override
+    protected boolean startQuery(final QueryId queryId, final 
AllocationResourceProto allocation) {
+      executorService.schedule(new Runnable() {
+        @Override
+        public void run() {
+          barrier.release();
+          qmAllocationMap.put(queryId, allocation);
+          rmContext.getDispatcher().getEventHandler().handle(new 
SchedulerEvent(SchedulerEventType.RESOURCE_UPDATE));
+        }
+      }, testDelay, TimeUnit.MILLISECONDS);
+      return true;
+    }
+
+    @Override
+    public void handle(SchedulerEvent event) {
+      super.handle(event);
+      barrier.release();
+    }
+
+    @Override
+    protected QueryInfo getQueryInfo(QueryId queryId) {
+      return queryInfoMap.get(queryId);
+    }
+
+    @Override
+    public void stopQuery(QueryId queryId) {
+      queryInfoMap.remove(queryId);
+      AllocationResourceProto allocationResourceProto = 
qmAllocationMap.remove(queryId);
+      
NodeResources.addTo(rmContext.getNodes().get(allocationResourceProto.getWorkerId()).getAvailableResource(),
+          new NodeResource(allocationResourceProto.getResource()));
+      super.stopQuery(queryId);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java 
b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index 1351716..3e8e230 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -19,10 +19,11 @@
 package org.apache.tajo.querymaster;
 
 import com.google.common.collect.Lists;
-import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.tajo.*;
+import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.benchmark.TPCH;
 import org.apache.tajo.catalog.CatalogService;
@@ -34,26 +35,19 @@ import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.engine.query.TaskRequestImpl;
-import org.apache.tajo.ipc.QueryCoordinatorProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.QueryEvent;
+import org.apache.tajo.master.event.QueryEventType;
+import org.apache.tajo.master.event.StageEvent;
+import org.apache.tajo.master.event.StageEventType;
 import org.apache.tajo.plan.LogicalOptimizer;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
 import org.apache.tajo.plan.serder.PlanProto;
-import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.resource.NodeResources;
 import org.apache.tajo.session.Session;
-import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.storage.TablespaceManager;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.util.history.HistoryReader;
-import org.apache.tajo.util.history.HistoryWriter;
-import org.apache.tajo.util.metrics.TajoSystemMetrics;
-import org.apache.tajo.worker.ExecutionBlockContext;
-import org.apache.tajo.worker.LegacyTaskImpl;
-import org.apache.tajo.worker.TajoWorker;
-import org.apache.tajo.worker.TaskRunnerManager;
+import org.apache.tajo.worker.*;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -123,7 +117,7 @@ public class TestKillQuery {
 
     QueryMaster qm = 
cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster();
     QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(),
-        queryId, session, defaultContext, expr.toJson(), dispatch);
+        queryId, session, defaultContext, expr.toJson(), 
NodeResources.createResource(512), dispatch);
 
     queryMasterTask.init(conf);
     queryMasterTask.getQueryTaskContext().getDispatcher().start();
@@ -187,7 +181,7 @@ public class TestKillQuery {
 
     QueryMaster qm = 
cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster();
     QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(),
-        queryId, session, defaultContext, expr.toJson(), dispatch);
+        queryId, session, defaultContext, expr.toJson(), 
NodeResources.createResource(512), dispatch);
 
     queryMasterTask.init(conf);
     queryMasterTask.getQueryTaskContext().getDispatcher().start();
@@ -223,9 +217,6 @@ public class TestKillQuery {
     lastStage.getStateMachine().doTransition(StageEventType.SQ_KILL,
         new StageEvent(lastStage.getId(), StageEventType.SQ_KILL));
 
-    
lastStage.getStateMachine().doTransition(StageEventType.SQ_CONTAINER_ALLOCATED,
-        new StageEvent(lastStage.getId(), 
StageEventType.SQ_CONTAINER_ALLOCATED));
-
     lastStage.getStateMachine().doTransition(StageEventType.SQ_SHUFFLE_REPORT,
         new StageEvent(lastStage.getId(), StageEventType.SQ_SHUFFLE_REPORT));
 
@@ -243,22 +234,21 @@ public class TestKillQuery {
     TaskId tid = QueryIdFactory.newTaskId(eid);
     final TajoConf conf = new TajoConf();
     TaskRequestImpl taskRequest = new TaskRequestImpl();
+    WorkerConnectionInfo queryMaster = new WorkerConnectionInfo("host", 28091, 
28092, 21000, 28093, 28080);
 
-    taskRequest.set(null, new ArrayList<CatalogProtos.FragmentProto>(),
-        null, false, PlanProto.LogicalNodeTree.newBuilder().build(), new 
QueryContext(conf), null, null);
-    taskRequest.setInterQuery();
     TaskAttemptId attemptId = new TaskAttemptId(tid, 1);
+    taskRequest.set(attemptId, new ArrayList<CatalogProtos.FragmentProto>(),
+        null, false, PlanProto.LogicalNodeTree.newBuilder().build(), new 
QueryContext(conf),
+        null, null, queryMaster.getHostAndQMPort());
+    taskRequest.setInterQuery();
 
-    WorkerConnectionInfo queryMaster = new WorkerConnectionInfo("host", 28091, 
28092, 21000, 28093, 28080);
-    TajoWorkerProtocol.RunExecutionBlockRequestProto.Builder
-        requestProto = 
TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder();
 
-    requestProto.setExecutionBlockId(eid.getProto())
-        .setQueryMaster(queryMaster.getProto())
-        .setNodeId(queryMaster.getHost()+":" + 
queryMaster.getQueryMasterPort())
-        .setContainerId("test")
-        .setQueryContext(new QueryContext(conf).getProto())
+    ExecutionBlockContextResponse.Builder requestProtoBuilder =
+        ExecutionBlockContextResponse.newBuilder();
+    requestProtoBuilder.setExecutionBlockId(eid.getProto())
         .setPlanJson("test")
+        .setQueryContext(new QueryContext(conf).getProto())
+        .setQueryOutputPath("testpath")
         .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
 
     TajoWorker.WorkerContext workerContext = new MockWorkerContext() {
@@ -266,13 +256,31 @@ public class TestKillQuery {
       public TajoConf getConf() {
         return conf;
       }
+
+      @Override
+      public TaskManager getTaskManager() {
+        return null;
+      }
+
+      @Override
+      public TaskExecutor getTaskExecuor() {
+        return null;
+      }
+
+      @Override
+      public NodeResourceManager getNodeResourceManager() {
+        return null;
+      }
     };
 
-    ExecutionBlockContext context =
-        new ExecutionBlockContext(workerContext, null, requestProto.build());
+    ExecutionBlockContext context = new MockExecutionBlock(workerContext, 
requestProtoBuilder.build()) {
+      @Override
+      public Path createBaseDir() throws IOException {
+        return new Path("test");
+      }
+    };
 
-    org.apache.tajo.worker.Task task = new LegacyTaskImpl("test", 
CommonTestingUtil.getTestDir(), attemptId,
-        conf, context, taskRequest);
+    org.apache.tajo.worker.Task task = new TaskImpl(taskRequest, context, 
null);
     task.kill();
     assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, 
task.getTaskContext().getState());
     try {
@@ -301,94 +309,4 @@ public class TestKillQuery {
       super.dispatch(event);
     }
   }
-
-  abstract class MockWorkerContext implements TajoWorker.WorkerContext {
-
-    @Override
-    public QueryMaster getQueryMaster() {
-      return null;
-    }
-
-    public abstract TajoConf getConf();
-
-    @Override
-    public ServiceTracker getServiceTracker() {
-      return null;
-    }
-
-    @Override
-    public QueryMasterManagerService getQueryMasterManagerService() {
-      return null;
-    }
-
-    @Override
-    public TaskRunnerManager getTaskRunnerManager() {
-      return null;
-    }
-
-    @Override
-    public CatalogService getCatalog() {
-      return null;
-    }
-
-    @Override
-    public WorkerConnectionInfo getConnectionInfo() {
-      return null;
-    }
-
-    @Override
-    public String getWorkerName() {
-      return null;
-    }
-
-    @Override
-    public LocalDirAllocator getLocalDirAllocator() {
-      return null;
-    }
-
-    @Override
-    public QueryCoordinatorProtocol.ClusterResourceSummary 
getClusterResource() {
-      return null;
-    }
-
-    @Override
-    public TajoSystemMetrics getWorkerSystemMetrics() {
-      return null;
-    }
-
-    @Override
-    public HashShuffleAppenderManager getHashShuffleAppenderManager() {
-      return null;
-    }
-
-    @Override
-    public HistoryWriter getTaskHistoryWriter() {
-      return null;
-    }
-
-    @Override
-    public HistoryReader getHistoryReader() {
-      return null;
-    }
-
-    @Override
-    public void cleanup(String strPath) {
-
-    }
-
-    @Override
-    public void cleanupTemporalDirectories() {
-
-    }
-
-    @Override
-    public void 
setClusterResource(QueryCoordinatorProtocol.ClusterResourceSummary 
clusterResource) {
-
-    }
-
-    @Override
-    public void setNumClusterNodes(int numClusterNodes) {
-
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestQueryState.java 
b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
index a822e42..d2241b4 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestQueryState.java
@@ -18,28 +18,36 @@
 
 package org.apache.tajo.querymaster;
 
+import net.jcip.annotations.NotThreadSafe;
 import org.apache.tajo.*;
+import org.apache.tajo.client.QueryStatus;
 import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientUtil;
 import org.apache.tajo.ipc.ClientProtos;
-import org.apache.tajo.master.QueryManager;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.*;
 
 @Category(IntegrationTest.class)
+@NotThreadSafe
 public class TestQueryState {
   private static TajoTestingCluster cluster;
   private static TajoClient client;
 
   @BeforeClass
-  public static void setUp() throws Exception {
+  public static void setUpClass() throws Exception {
     cluster = TpchTestBase.getInstance().getTestingCluster();
     client = cluster.newTajoClient();
   }
 
+  @AfterClass
+  public static void tearDownClass() {
+    client.close();
+  }
+
   @Test(timeout = 10000)
   public void testSucceededState() throws Exception {
     String queryStr = "select l_orderkey from lineitem group by l_orderkey 
order by l_orderkey";
@@ -61,16 +69,21 @@ public class TestQueryState {
 
     ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr);
     QueryId queryId = new QueryId(res.getQueryId());
-    cluster.waitForQuerySubmitted(queryId);
+
+    QueryStatus queryState = client.getQueryStatus(queryId);
+    while (!TajoClientUtil.isQueryComplete(queryState.getState())) {
+      try {
+        Thread.sleep(50);
+      } catch (InterruptedException e) {
+        fail("Query state : " + queryState);
+      }
+      queryState = client.getQueryStatus(queryId);
+    }
 
     QueryMasterTask qmt = cluster.getQueryMasterTask(queryId);
     Query query = qmt.getQuery();
 
-    // wait for query complete
-    cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_SUCCEEDED, 
100);
-
     assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, qmt.getState());
-
     assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, 
query.getSynchronizedState());
     assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, query.getState());
 
@@ -80,13 +93,6 @@ public class TestQueryState {
       assertEquals(StageState.SUCCEEDED, stage.getState());
     }
 
-    /* wait for heartbeat from QueryMaster */
-    QueryManager queryManager = 
cluster.getMaster().getContext().getQueryJobManager();
-    for (; ; ) {
-      if (queryManager.getFinishedQuery(queryId) != null) break;
-      else Thread.sleep(100);
-    }
-
     /* get status from TajoMaster */
     assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, 
client.getQueryStatus(queryId).getState());
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java 
b/tajo-core/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java
index 51622b5..0ce2646 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java
@@ -124,6 +124,7 @@ public class TestSystemMetrics {
     }
 
     assertEquals(2, lines.size());
+    tajoSystemMetrics.stop();
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java 
b/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
index 9d4e1f3..7d7fb1a 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
@@ -18,16 +18,16 @@
 
 package org.apache.tajo.worker;
 
+import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
 import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
 
 import java.io.IOException;
 
 public class MockExecutionBlock extends ExecutionBlockContext {
 
   public MockExecutionBlock(TajoWorker.WorkerContext workerContext,
-                            TajoWorkerProtocol.RunExecutionBlockRequestProto 
request) throws IOException {
-    super(workerContext, null, request);
+                            ExecutionBlockContextResponse request) throws 
IOException {
+    super(workerContext, request, null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java 
b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
index 18b9405..8c8427d 100644
--- 
a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
+++ 
b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java
@@ -19,11 +19,11 @@
 package org.apache.tajo.worker;
 
 import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.ResourceProtos.TaskAllocationProto;
+import org.apache.tajo.ResourceProtos.TaskRequestProto;
 import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.resource.NodeResource;
 import org.apache.tajo.resource.NodeResources;
@@ -38,8 +38,8 @@ public class MockNodeResourceManager extends 
NodeResourceManager {
   volatile boolean enableTaskHandlerEvent = true;
   private final Semaphore barrier;
 
-  public MockNodeResourceManager(Semaphore barrier, Dispatcher dispatcher, 
EventHandler taskEventHandler) {
-    super(dispatcher, taskEventHandler);
+  public MockNodeResourceManager(Semaphore barrier, Dispatcher dispatcher, 
TajoWorker.WorkerContext workerContext) {
+    super(dispatcher, workerContext);
     this.barrier = barrier;
   }
 
@@ -50,14 +50,7 @@ public class MockNodeResourceManager extends 
NodeResourceManager {
   }
 
   @Override
-  protected void 
startExecutionBlock(TajoWorkerProtocol.RunExecutionBlockRequestProto request) {
-    if(enableTaskHandlerEvent) {
-      super.startExecutionBlock(request);
-    }
-  }
-
-  @Override
-  protected void startTask(TajoWorkerProtocol.TaskRequestProto request, 
NodeResource resource) {
+  protected void startTask(TaskRequestProto request, NodeResource resource) {
     if(enableTaskHandlerEvent) {
       super.startTask(request, resource);
     }
@@ -70,24 +63,23 @@ public class MockNodeResourceManager extends 
NodeResourceManager {
     enableTaskHandlerEvent = flag;
   }
 
-  protected static Queue<TajoWorkerProtocol.TaskAllocationRequestProto> 
createTaskRequests(
+  protected static Queue<TaskAllocationProto> createTaskRequests(
       ExecutionBlockId ebId, int memory, int size) {
 
-    Queue<TajoWorkerProtocol.TaskAllocationRequestProto>
-        requestProtoList = new 
LinkedBlockingQueue<TajoWorkerProtocol.TaskAllocationRequestProto>();
+    Queue<TaskAllocationProto>
+        requestProtoList = new LinkedBlockingQueue<TaskAllocationProto>();
     for (int i = 0; i < size; i++) {
 
       TaskAttemptId taskAttemptId = 
QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(ebId, i), 0);
-      TajoWorkerProtocol.TaskRequestProto.Builder builder =
-          TajoWorkerProtocol.TaskRequestProto.newBuilder();
+      TaskRequestProto.Builder builder = TaskRequestProto.newBuilder();
+      builder.setQueryMasterHostAndPort("localhost:0");
       builder.setId(taskAttemptId.getProto());
-      builder.setShouldDie(true);
       builder.setOutputTable("");
       builder.setPlan(PlanProto.LogicalNodeTree.newBuilder());
       builder.setClusteredOutput(false);
 
 
-      
requestProtoList.add(TajoWorkerProtocol.TaskAllocationRequestProto.newBuilder()
+      requestProtoList.add(TaskAllocationProto.newBuilder()
           .setResource(NodeResources.createResource(memory).getProto())
           .setTaskRequest(builder.build()).build());
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java 
b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
index dfcfd4f..634398f 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java
@@ -21,8 +21,7 @@ package org.apache.tajo.worker;
 import com.google.common.collect.Maps;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
-import org.apache.tajo.ipc.QueryCoordinatorProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import 
org.apache.tajo.ipc.TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService;
 import org.apache.tajo.resource.NodeResource;
 import org.apache.tajo.resource.NodeResources;
 
@@ -30,7 +29,7 @@ import java.net.ConnectException;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
-import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.*;
+import static org.apache.tajo.ResourceProtos.*;
 
 public class MockNodeStatusUpdater extends NodeStatusUpdater {
 
@@ -39,9 +38,8 @@ public class MockNodeStatusUpdater extends NodeStatusUpdater {
   private Map<Integer, NodeResource> resources = Maps.newHashMap();
   private MockResourceTracker resourceTracker;
 
-  public MockNodeStatusUpdater(CountDownLatch barrier, 
TajoWorker.WorkerContext workerContext,
-                               NodeResourceManager resourceManager) {
-    super(workerContext, resourceManager);
+  public MockNodeStatusUpdater(CountDownLatch barrier, 
TajoWorker.WorkerContext workerContext) {
+    super(workerContext);
     this.barrier = barrier;
     this.resourceTracker = new MockResourceTracker();
   }
@@ -58,7 +56,7 @@ public class MockNodeStatusUpdater extends NodeStatusUpdater {
   }
 
   class MockResourceTracker implements 
TajoResourceTrackerProtocolService.Interface {
-    private NodeHeartbeatRequestProto lastRequest;
+    private NodeHeartbeatRequest lastRequest;
 
     protected Map<Integer, NodeResource> getTotalResource() {
       return membership;
@@ -68,21 +66,15 @@ public class MockNodeStatusUpdater extends 
NodeStatusUpdater {
       return membership;
     }
 
-    protected NodeHeartbeatRequestProto getLastRequest() {
+    protected NodeHeartbeatRequest getLastRequest() {
       return lastRequest;
     }
 
     @Override
-    public void heartbeat(RpcController controller, NodeHeartbeat request,
-                          
RpcCallback<QueryCoordinatorProtocol.TajoHeartbeatResponse> done) {
+    public void nodeHeartbeat(RpcController controller, NodeHeartbeatRequest 
request,
+                              RpcCallback<NodeHeartbeatResponse> done) {
 
-    }
-
-    @Override
-    public void nodeHeartbeat(RpcController controller, 
NodeHeartbeatRequestProto request,
-                              RpcCallback<NodeHeartbeatResponseProto> done) {
-
-      NodeHeartbeatResponseProto.Builder response = 
NodeHeartbeatResponseProto.newBuilder();
+      NodeHeartbeatResponse.Builder response = 
NodeHeartbeatResponse.newBuilder();
       if (membership.containsKey(request.getWorkerId())) {
         if (request.hasAvailableResource()) {
           NodeResource resource = resources.get(request.getWorkerId());

Reply via email to