http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java 
b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
index 0c8d8ce..b4ed5fd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.master;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.collections.map.LRUMap;
@@ -30,12 +29,15 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.ResourceProtos;
+import org.apache.tajo.ResourceProtos.AllocationResourceProto;
+import org.apache.tajo.ResourceProtos.TajoHeartbeatRequest;
+import org.apache.tajo.ResourceProtos.TajoHeartbeatResponse;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.QueryCoordinatorProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.scheduler.SimpleFifoScheduler;
+import org.apache.tajo.master.scheduler.QuerySchedulingInfo;
 import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.querymaster.QueryJobEvent;
 import org.apache.tajo.session.Session;
@@ -54,14 +56,13 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class QueryManager extends CompositeService {
   private static final Log LOG = 
LogFactory.getLog(QueryManager.class.getName());
+  private static final String EMPTY_QM_HOSTNAME = "";
 
   // TajoMaster Context
   private final TajoMaster.MasterContext masterContext;
 
   private AsyncDispatcher dispatcher;
 
-  private SimpleFifoScheduler scheduler;
-
   private final Map<QueryId, QueryInProgress> submittedQueries = 
Maps.newConcurrentMap();
 
   private final Map<QueryId, QueryInProgress> runningQueries = 
Maps.newConcurrentMap();
@@ -85,7 +86,6 @@ public class QueryManager extends CompositeService {
 
       this.dispatcher.register(QueryJobEvent.Type.class, new 
QueryJobManagerEventHandler());
 
-      this.scheduler = new SimpleFifoScheduler(this);
     } catch (Exception e) {
       LOG.error("Failed to init service " + getName() + " by exception " + e, 
e);
     }
@@ -95,18 +95,15 @@ public class QueryManager extends CompositeService {
 
   @Override
   public void serviceStop() throws Exception {
-    synchronized(runningQueries) {
-      for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
-        eachQueryInProgress.stopProgress();
-      }
+    for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
+      eachQueryInProgress.stopProgress();
     }
-    this.scheduler.stop();
+
     super.serviceStop();
   }
 
   @Override
   public void serviceStart() throws Exception {
-    this.scheduler.start();
     super.serviceStart();
   }
 
@@ -115,28 +112,25 @@ public class QueryManager extends CompositeService {
   }
 
   public Collection<QueryInProgress> getSubmittedQueries() {
-    synchronized (submittedQueries){
-      return Collections.unmodifiableCollection(submittedQueries.values());
-    }
+    return Collections.unmodifiableCollection(submittedQueries.values());
   }
 
   public Collection<QueryInProgress> getRunningQueries() {
-    synchronized (runningQueries){
-      return Collections.unmodifiableCollection(runningQueries.values());
-    }
+    return Collections.unmodifiableCollection(runningQueries.values());
   }
 
   public synchronized Collection<QueryInfo> getFinishedQueries() {
+    Set<QueryInfo> result = Sets.newTreeSet();
+    synchronized (historyCache) {
+      result.addAll(historyCache.values());
+    }
+
     try {
-      Set<QueryInfo> result = Sets.newTreeSet();
       result.addAll(this.masterContext.getHistoryReader().getQueries(null));
-      synchronized (historyCache) {
-        result.addAll(historyCache.values());
-      }
       return result;
     } catch (Throwable e) {
       LOG.error(e, e);
-      return Lists.newArrayList();
+      return result;
     }
   }
 
@@ -171,6 +165,9 @@ public class QueryManager extends CompositeService {
     return queryInProgress.getQueryInfo();
   }
 
+  /**
+   * submit query to scheduler
+   */
   public QueryInfo scheduleQuery(Session session, QueryContext queryContext, 
String sql,
                                  String jsonExpr, LogicalRootNode plan)
       throws Exception {
@@ -178,35 +175,29 @@ public class QueryManager extends CompositeService {
     QueryInProgress queryInProgress = new QueryInProgress(masterContext, 
session, queryContext, queryId, sql,
         jsonExpr, plan);
 
-    synchronized (submittedQueries) {
-      queryInProgress.getQueryInfo().setQueryMaster("");
-      submittedQueries.put(queryInProgress.getQueryId(), queryInProgress);
-    }
+    queryInProgress.getQueryInfo().setQueryMaster(EMPTY_QM_HOSTNAME);
+    submittedQueries.put(queryInProgress.getQueryId(), queryInProgress);
+    //TODO implement scheduler queue
+    QuerySchedulingInfo querySchedulingInfo = new 
QuerySchedulingInfo("default", queryContext.getUser(),
+        queryInProgress.getQueryId(), 1, 
queryInProgress.getQueryInfo().getStartTime());
 
-    scheduler.addQuery(queryInProgress);
+    masterContext.getResourceManager().submitQuery(querySchedulingInfo);
     return queryInProgress.getQueryInfo();
   }
 
-  public QueryInfo startQueryJob(QueryId queryId) throws Exception {
-
-    QueryInProgress queryInProgress;
+  /**
+   * Can start query or not
+   */
+  public boolean startQueryJob(QueryId queryId, AllocationResourceProto 
allocation) {
 
-    synchronized (submittedQueries) {
-      queryInProgress = submittedQueries.remove(queryId);
-    }
-
-    synchronized (runningQueries) {
+    if (submittedQueries.get(queryId).allocateToQueryMaster(allocation)) {
+      QueryInProgress queryInProgress = submittedQueries.remove(queryId);
       runningQueries.put(queryInProgress.getQueryId(), queryInProgress);
-    }
-
-    if (queryInProgress.startQueryMaster()) {
       dispatcher.getEventHandler().handle(new 
QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START,
           queryInProgress.getQueryInfo()));
-    } else {
-      
masterContext.getQueryJobManager().stopQuery(queryInProgress.getQueryId());
+      return true;
     }
-
-    return queryInProgress.getQueryInfo();
+    return false;
   }
 
   class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
@@ -221,10 +212,10 @@ public class QueryManager extends CompositeService {
       }
 
       if (event.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
-        queryInProgress.submitQueryToMaster();
+        queryInProgress.submitToQueryMaster();
 
       } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
-        scheduler.removeQuery(queryInProgress.getQueryId());
+
         queryInProgress.kill();
         stopQuery(queryInProgress.getQueryId());
 
@@ -236,14 +227,10 @@ public class QueryManager extends CompositeService {
 
   public QueryInProgress getQueryInProgress(QueryId queryId) {
     QueryInProgress queryInProgress;
-    synchronized (submittedQueries) {
-      queryInProgress = submittedQueries.get(queryId);
-    }
+    queryInProgress = submittedQueries.get(queryId);
 
     if (queryInProgress == null) {
-      synchronized (runningQueries) {
-        queryInProgress = runningQueries.get(queryId);
-      }
+      queryInProgress = runningQueries.get(queryId);
     }
     return queryInProgress;
   }
@@ -253,13 +240,8 @@ public class QueryManager extends CompositeService {
     QueryInProgress queryInProgress = getQueryInProgress(queryId);
     if(queryInProgress != null) {
       queryInProgress.stopProgress();
-      synchronized(submittedQueries) {
-        submittedQueries.remove(queryId);
-      }
-
-      synchronized(runningQueries) {
-        runningQueries.remove(queryId);
-      }
+      submittedQueries.remove(queryId);
+      runningQueries.remove(queryId);
 
       QueryInfo queryInfo = queryInProgress.getQueryInfo();
       synchronized (historyCache) {
@@ -304,8 +286,8 @@ public class QueryManager extends CompositeService {
     return executedQuerySize.get();
   }
 
-  public synchronized 
QueryCoordinatorProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat(
-      QueryCoordinatorProtocol.TajoHeartbeat queryHeartbeat) {
+  public synchronized TajoHeartbeatResponse.ResponseCommand queryHeartbeat(
+      TajoHeartbeatRequest queryHeartbeat) {
     QueryInProgress queryInProgress = getQueryInProgress(new 
QueryId(queryHeartbeat.getQueryId()));
     if(queryInProgress == null) {
       return null;
@@ -317,7 +299,7 @@ public class QueryManager extends CompositeService {
     return null;
   }
 
-  private QueryInfo 
makeQueryInfoFromHeartbeat(QueryCoordinatorProtocol.TajoHeartbeat 
queryHeartbeat) {
+  private QueryInfo 
makeQueryInfoFromHeartbeat(ResourceProtos.TajoHeartbeatRequest queryHeartbeat) {
     QueryInfo queryInfo = new QueryInfo(new 
QueryId(queryHeartbeat.getQueryId()));
     WorkerConnectionInfo connectionInfo = new 
WorkerConnectionInfo(queryHeartbeat.getConnectionInfo());
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java 
b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
deleted file mode 100644
index 2aac005..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.QueryCoordinatorProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.container.TajoContainer;
-import org.apache.tajo.master.container.TajoContainerId;
-import org.apache.tajo.master.event.TaskFatalErrorEvent;
-import org.apache.tajo.master.rm.TajoWorkerContainer;
-import org.apache.tajo.master.rm.TajoWorkerContainerId;
-import org.apache.tajo.plan.serder.PlanProto;
-import org.apache.tajo.querymaster.QueryMasterTask;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcClientManager;
-import org.apache.tajo.service.ServiceTracker;
-import org.apache.tajo.worker.TajoWorker;
-
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-public class TajoContainerProxy extends ContainerProxy {
-  private final QueryContext queryContext;
-  private final TajoWorker.WorkerContext workerContext;
-  private final String planJson;
-
-  public TajoContainerProxy(QueryMasterTask.QueryMasterTaskContext context,
-                            Configuration conf, TajoContainer container,
-                            QueryContext queryContext, ExecutionBlockId 
executionBlockId, String planJson) {
-    super(context, conf, executionBlockId, container);
-    this.queryContext = queryContext;
-    this.workerContext = context.getQueryMasterContext().getWorkerContext();
-    this.planJson = planJson;
-  }
-
-  @Override
-  public synchronized void launch(ContainerLaunchContext 
containerLaunchContext) {
-    context.getResourceAllocator().addContainer(containerId, this);
-
-    this.hostName = container.getNodeId().getHost();
-    this.port = 
((TajoWorkerContainer)container).getWorkerResource().getConnectionInfo().getPullServerPort();
-    this.state = ContainerState.RUNNING;
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Launch Container:" + executionBlockId + "," + 
containerId.getId() + "," +
-          container.getId() + "," + container.getNodeId() + ", pullServer=" + 
port);
-    }
-
-    assignExecutionBlock(executionBlockId, container);
-  }
-
-  /**
-   * It sends a kill RPC request to a corresponding worker.
-   *
-   * @param taskAttemptId The TaskAttemptId to be killed.
-   */
-  public void killTaskAttempt(TaskAttemptId taskAttemptId) {
-    NettyClientBase tajoWorkerRpc = null;
-    try {
-      InetSocketAddress addr = new 
InetSocketAddress(container.getNodeId().getHost(), 
container.getNodeId().getPort());
-      tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, 
TajoWorkerProtocol.class, true);
-      TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = 
tajoWorkerRpc.getStub();
-      tajoWorkerRpcClient.killTaskAttempt(null, taskAttemptId.getProto(), 
NullCallback.get());
-    } catch (Throwable e) {
-      /* Worker RPC failure */
-      context.getEventHandler().handle(new TaskFatalErrorEvent(taskAttemptId, 
e.getMessage()));
-    }
-  }
-
-  private void assignExecutionBlock(ExecutionBlockId executionBlockId, 
TajoContainer container) {
-    NettyClientBase tajoWorkerRpc;
-    try {
-
-      InetSocketAddress addr = new 
InetSocketAddress(container.getNodeId().getHost(), 
container.getNodeId().getPort());
-      tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, 
TajoWorkerProtocol.class, true);
-      TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = 
tajoWorkerRpc.getStub();
-
-      PlanProto.ShuffleType shuffleType =
-          
context.getQuery().getStage(executionBlockId).getDataChannel().getShuffleType();
-
-      TajoWorkerProtocol.RunExecutionBlockRequestProto request =
-          TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder()
-              .setExecutionBlockId(executionBlockId.getProto())
-              
.setQueryMaster(context.getQueryMasterContext().getWorkerContext().getConnectionInfo().getProto())
-              .setNodeId(container.getNodeId().toString())
-              .setContainerId(container.getId().toString())
-              .setQueryOutputPath(context.getStagingDir().toString())
-              .setQueryContext(queryContext.getProto())
-              .setPlanJson(planJson)
-              .setShuffleType(shuffleType)
-              .build();
-
-      tajoWorkerRpcClient.startExecutionBlock(null, request, 
NullCallback.get());
-    } catch (Throwable e) {
-      LOG.error(e.getMessage(), e);
-    }
-  }
-
-  @Override
-  public synchronized void stopContainer() {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Release TajoWorker Resource: " + executionBlockId + "," + 
containerId + ", state:" + this.state);
-    }
-    if(isCompletelyDone()) {
-      LOG.info("Container already stopped:" + containerId);
-      return;
-    }
-    if(this.state == ContainerState.PREP) {
-      this.state = ContainerState.KILLED_BEFORE_LAUNCH;
-    } else {
-      try {
-        releaseWorkerResource(context, executionBlockId, 
Arrays.asList(containerId));
-        context.getResourceAllocator().removeContainer(containerId);
-      } catch (Throwable t) {
-        // ignore the cleanup failure
-        String message = "cleanup failed for container "
-            + this.containerId + " : "
-            + StringUtils.stringifyException(t);
-        LOG.warn(message);
-      } finally {
-        this.state = ContainerState.DONE;
-      }
-    }
-  }
-
-  public static void 
releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext context,
-                                           ExecutionBlockId executionBlockId,
-                                           List<TajoContainerId> containerIds) 
throws Exception {
-    List<ContainerProtocol.TajoContainerIdProto> containerIdProtos =
-        new ArrayList<ContainerProtocol.TajoContainerIdProto>();
-
-    for(TajoContainerId eachContainerId: containerIds) {
-      
containerIdProtos.add(TajoWorkerContainerId.getContainerIdProto(eachContainerId));
-    }
-
-    RpcClientManager manager = RpcClientManager.getInstance();
-    NettyClientBase tmClient = null;
-
-    ServiceTracker serviceTracker = 
context.getQueryMasterContext().getWorkerContext().getServiceTracker();
-    tmClient = manager.getClient(serviceTracker.getUmbilicalAddress(), 
QueryCoordinatorProtocol.class, true);
-
-    QueryCoordinatorProtocol.QueryCoordinatorProtocolService 
masterClientService = tmClient.getStub();
-    masterClientService.releaseWorkerResource(null,
-        QueryCoordinatorProtocol.WorkerResourceReleaseRequest.newBuilder()
-            .setExecutionBlockId(executionBlockId.getProto())
-            .addAllContainerIds(containerIdProtos)
-            .build(),
-        NullCallback.get());
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java 
b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index e1e85dd..9327c59 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -21,14 +21,10 @@ package org.apache.tajo.master;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
@@ -42,8 +38,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.function.FunctionLoader;
 import org.apache.tajo.function.FunctionSignature;
-import org.apache.tajo.master.rm.TajoWorkerResourceManager;
-import org.apache.tajo.master.rm.WorkerResourceManager;
+import org.apache.tajo.master.rm.TajoResourceManager;
 import org.apache.tajo.metrics.CatalogMetricsGaugeSet;
 import org.apache.tajo.metrics.WorkerResourceMetricsGaugeSet;
 import org.apache.tajo.rpc.RpcChannelFactory;
@@ -68,7 +63,6 @@ import java.io.*;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
-import java.lang.reflect.Constructor;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -121,7 +115,7 @@ public class TajoMaster extends CompositeService {
   private QueryCoordinatorService tajoMasterService;
   private SessionManager sessionManager;
 
-  private WorkerResourceManager resourceManager;
+  private TajoResourceManager resourceManager;
   //Web Server
   private StaticHttpServer webServer;
   private TajoRestService restServer;
@@ -157,66 +151,59 @@ public class TajoMaster extends CompositeService {
   }
 
   @Override
-  public void serviceInit(Configuration _conf) throws Exception {
-    if (!(_conf instanceof TajoConf)) {
-      throw new IllegalArgumentException("_conf should be a TajoConf type.");
-    }
-    this.systemConf = (TajoConf) _conf;
+  public void serviceInit(Configuration conf) throws Exception {
+
+    this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
     Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
 
     context = new MasterContext(systemConf);
     clock = new SystemClock();
 
-    try {
-      RackResolver.init(systemConf);
+    RackResolver.init(systemConf);
 
-      RpcClientManager rpcManager = RpcClientManager.getInstance();
-      
rpcManager.setRetries(systemConf.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, 
RpcConstants.DEFAULT_RPC_RETRIES));
-      rpcManager.setTimeoutSeconds(
-          systemConf.getInt(RpcConstants.RPC_CLIENT_TIMEOUT_SECS, 
RpcConstants.DEFAULT_RPC_TIMEOUT_SECONDS));
+    RpcClientManager rpcManager = RpcClientManager.getInstance();
+    rpcManager.setRetries(systemConf.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, 
RpcConstants.DEFAULT_RPC_RETRIES));
+    rpcManager.setTimeoutSeconds(
+        systemConf.getInt(RpcConstants.RPC_CLIENT_TIMEOUT_SECS, 
RpcConstants.DEFAULT_RPC_TIMEOUT_SECONDS));
 
-      initResourceManager();
+    initResourceManager();
 
-      this.dispatcher = new AsyncDispatcher();
-      addIfService(dispatcher);
+    this.dispatcher = new AsyncDispatcher();
+    addIfService(dispatcher);
 
       // check the system directory and create if they are not created.
       checkAndInitializeSystemDirectories();
       diagnoseTajoMaster();
 
-      catalogServer = new CatalogServer(loadFunctions());
-      addIfService(catalogServer);
-      catalog = new LocalCatalogWrapper(catalogServer, systemConf);
+    catalogServer = new CatalogServer(loadFunctions());
+    addIfService(catalogServer);
+    catalog = new LocalCatalogWrapper(catalogServer, systemConf);
 
-      sessionManager = new SessionManager(dispatcher);
-      addIfService(sessionManager);
+    sessionManager = new SessionManager(dispatcher);
+    addIfService(sessionManager);
 
-      globalEngine = new GlobalEngine(context);
-      addIfService(globalEngine);
+    globalEngine = new GlobalEngine(context);
+    addIfService(globalEngine);
 
-      queryManager = new QueryManager(context);
-      addIfService(queryManager);
+    queryManager = new QueryManager(context);
+    addIfService(queryManager);
 
-      tajoMasterClientService = new TajoMasterClientService(context);
-      addIfService(tajoMasterClientService);
+    tajoMasterClientService = new TajoMasterClientService(context);
+    addIfService(tajoMasterClientService);
 
-      tajoMasterService = new QueryCoordinatorService(context);
-      addIfService(tajoMasterService);
-      
-      restServer = new TajoRestService(context);
-      addIfService(restServer);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      throw e;
-    }
+    tajoMasterService = new QueryCoordinatorService(context);
+    addIfService(tajoMasterService);
 
+    restServer = new TajoRestService(context);
+    addIfService(restServer);
+    
     // Try to start up all services in TajoMaster.
     // If anyone is failed, the master prints out the errors and immediately 
should shutdowns
     try {
       super.serviceInit(systemConf);
     } catch (Throwable t) {
       t.printStackTrace();
-      System.exit(1);
+      Runtime.getRuntime().halt(-1);
     }
     LOG.info("Tajo Master is initialized.");
   }
@@ -235,10 +222,7 @@ public class TajoMaster extends CompositeService {
   }
 
   private void initResourceManager() throws Exception {
-    Class<WorkerResourceManager>  resourceManagerClass = 
(Class<WorkerResourceManager>)
-        systemConf.getClass(ConfVars.RESOURCE_MANAGER_CLASS.varname, 
TajoWorkerResourceManager.class);
-    Constructor<WorkerResourceManager> constructor = 
resourceManagerClass.getConstructor(MasterContext.class);
-    resourceManager = constructor.newInstance(context);
+    resourceManager = new TajoResourceManager(context);
     addIfService(resourceManager);
   }
 
@@ -391,39 +375,19 @@ public class TajoMaster extends CompositeService {
   }
 
   @Override
-  public void stop() {
-    if (haService != null) {
-      try {
-        haService.delete();
-      } catch (Exception e) {
-        LOG.error(e, e);
-      }
-    }
-    
-    if (restServer != null) {
-      try {
-        restServer.stop();
-      } catch (Exception e) {
-        LOG.error(e.getMessage(), e);
-      }
-    }
+  public void serviceStop() throws Exception {
+    if (haService != null) haService.delete();
 
-    if (webServer != null) {
-      try {
-        webServer.stop();
-      } catch (Exception e) {
-        LOG.error(e, e);
-      }
-    }
+    if (restServer != null) restServer.stop();
+
+    if (webServer != null) webServer.stop();
 
     IOUtils.cleanup(LOG, catalogServer);
 
-    if(systemMetrics != null) {
-      systemMetrics.stop();
-    }
+    if (systemMetrics != null) systemMetrics.stop();
 
-    if(pauseMonitor != null) pauseMonitor.stop();
-    super.stop();
+    if (pauseMonitor != null) pauseMonitor.stop();
+    super.serviceStop();
 
     LOG.info("Tajo Master main thread exiting");
   }
@@ -463,7 +427,7 @@ public class TajoMaster extends CompositeService {
       return queryManager;
     }
 
-    public WorkerResourceManager getResourceManager() {
+    public TajoResourceManager getResourceManager() {
       return resourceManager;
     }
 
@@ -601,7 +565,7 @@ public class TajoMaster extends CompositeService {
 
     try {
       TajoMaster master = new TajoMaster();
-      TajoConf conf = new TajoConf(new YarnConfiguration());
+      TajoConf conf = new TajoConf();
       master.init(conf);
       master.start();
     } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java 
b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 7dbe815..e0332d5 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -47,8 +47,7 @@ import 
org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolServ
 import org.apache.tajo.master.TajoMaster.MasterContext;
 import org.apache.tajo.master.exec.NonForwardQueryResultFileScanner;
 import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
-import org.apache.tajo.master.rm.Worker;
-import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.master.rm.NodeStatus;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.logical.PartitionedTableScanNode;
 import org.apache.tajo.plan.logical.ScanNode;
@@ -95,32 +94,27 @@ public class TajoMasterClientService extends 
AbstractService {
   }
 
   @Override
-  public void start() {
+  public void serviceStart() throws Exception {
 
     // start the rpc server
     String confClientServiceAddr = 
conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS);
     InetSocketAddress initIsa = 
NetUtils.createSocketAddr(confClientServiceAddr);
     int workerNum = 
conf.getIntVar(ConfVars.MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM);
-    try {
-      server = new BlockingRpcServer(TajoMasterClientProtocol.class, 
clientHandler, initIsa, workerNum);
-    } catch (Exception e) {
-      LOG.error(e);
-      throw new RuntimeException(e);
-    }
+    server = new BlockingRpcServer(TajoMasterClientProtocol.class, 
clientHandler, initIsa, workerNum);
     server.start();
 
     bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
     this.conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, 
NetUtils.normalizeInetSocketAddress(bindAddress));
+    super.serviceStart();
     LOG.info("Instantiated TajoMasterClientService at " + this.bindAddress);
-    super.start();
   }
 
   @Override
-  public void stop() {
+  public void serviceStop() throws Exception {
     if (server != null) {
       server.shutdown();
     }
-    super.stop();
+    super.serviceStop();
   }
 
   public InetSocketAddress getBindAddress() {
@@ -590,7 +584,7 @@ public class TajoMasterClientService extends 
AbstractService {
         QueryManager queryManager = context.getQueryJobManager();
         QueryInProgress queryInProgress = 
queryManager.getQueryInProgress(queryId);
 
-        QueryInfo queryInfo = null;
+        QueryInfo queryInfo;
         if (queryInProgress == null) {
           queryInfo = context.getQueryJobManager().getFinishedQuery(queryId);
         } else {
@@ -610,9 +604,6 @@ public class TajoMasterClientService extends 
AbstractService {
       return builder.build();
     }
 
-    /**
-     * It is invoked by TajoContainerProxy.
-     */
     @Override
     public BoolProto killQuery(RpcController controller, QueryIdRequest 
request) throws ServiceException {
       try {
@@ -642,31 +633,20 @@ public class TajoMasterClientService extends 
AbstractService {
         context.getSessionManager().touch(request.getSessionId().getId());
         GetClusterInfoResponse.Builder builder= 
GetClusterInfoResponse.newBuilder();
 
-        Map<Integer, Worker> workers = 
context.getResourceManager().getWorkers();
-
-        List<Integer> wokerKeys = new ArrayList<Integer>(workers.keySet());
-        Collections.sort(wokerKeys);
-
-        WorkerResourceInfo.Builder workerBuilder
-          = WorkerResourceInfo.newBuilder();
-
-        for(Worker worker: workers.values()) {
-          WorkerResource workerResource = worker.getResource();
-
-          
workerBuilder.setConnectionInfo(worker.getConnectionInfo().getProto());
-          workerBuilder.setDiskSlots(workerResource.getDiskSlots());
-          workerBuilder.setCpuCoreSlots(workerResource.getCpuCoreSlots());
-          workerBuilder.setMemoryMB(workerResource.getMemoryMB());
-          workerBuilder.setLastHeartbeat(worker.getLastHeartbeatTime());
-          workerBuilder.setUsedMemoryMB(workerResource.getUsedMemoryMB());
-          
workerBuilder.setUsedCpuCoreSlots(workerResource.getUsedCpuCoreSlots());
-          workerBuilder.setUsedDiskSlots(workerResource.getUsedDiskSlots());
-          workerBuilder.setWorkerStatus(worker.getState().toString());
-          workerBuilder.setMaxHeap(workerResource.getMaxHeap());
-          workerBuilder.setFreeHeap(workerResource.getFreeHeap());
-          workerBuilder.setTotalHeap(workerResource.getTotalHeap());
-          
workerBuilder.setNumRunningTasks(workerResource.getNumRunningTasks());
-          
workerBuilder.setNumQueryMasterTasks(workerResource.getNumQueryMasterTasks());
+        List<NodeStatus> nodeStatusList = new 
ArrayList<NodeStatus>(context.getResourceManager().getRMContext().getNodes().values());
+        Collections.sort(nodeStatusList);
+
+        WorkerResourceInfo.Builder workerBuilder = 
WorkerResourceInfo.newBuilder();
+
+        for(NodeStatus nodeStatus : nodeStatusList) {
+          
workerBuilder.setConnectionInfo(nodeStatus.getConnectionInfo().getProto());
+          
workerBuilder.setAvailableResource(nodeStatus.getAvailableResource().getProto());
+          
workerBuilder.setTotalResource(nodeStatus.getTotalResourceCapability().getProto());
+
+          workerBuilder.setLastHeartbeat(nodeStatus.getLastHeartbeatTime());
+          workerBuilder.setWorkerStatus(nodeStatus.getState().toString());
+          workerBuilder.setNumRunningTasks(nodeStatus.getNumRunningTasks());
+          
workerBuilder.setNumQueryMasterTasks(nodeStatus.getNumRunningQueryMaster());
 
           builder.addWorkerList(workerBuilder.build());
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java 
b/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
deleted file mode 100644
index c1c6522..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
-import org.apache.tajo.master.container.TajoContainer;
-
-import java.util.Collection;
-
-public class TaskRunnerGroupEvent extends AbstractEvent<EventType> {
-  public enum EventType {
-    CONTAINER_REMOTE_LAUNCH,
-    CONTAINER_REMOTE_CLEANUP
-  }
-
-  protected final ExecutionBlockId executionBlockId;
-  protected final Collection<TajoContainer> containers;
-  public TaskRunnerGroupEvent(EventType eventType,
-                              ExecutionBlockId executionBlockId,
-                              Collection<TajoContainer> containers) {
-    super(eventType);
-    this.executionBlockId = executionBlockId;
-    this.containers = containers;
-  }
-
-  public Collection<TajoContainer> getContainers() {
-    return containers;
-  }
-
-  public ExecutionBlockId getExecutionBlockId() {
-    return executionBlockId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerLauncher.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerLauncher.java 
b/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerLauncher.java
deleted file mode 100644
index 9086e65..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/TaskRunnerLauncher.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.hadoop.yarn.event.EventHandler;
-
-public interface TaskRunnerLauncher extends EventHandler<TaskRunnerGroupEvent> 
{
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java
index 78d4978..0159a21 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java
@@ -111,6 +111,10 @@ public class WorkerConnectionInfo implements 
ProtoObject<WorkerConnectionInfoPro
     return this.getHost() + ":" + this.getPeerRpcPort();
   }
 
+  public String getHostAndQMPort() {
+    return this.getHost() + ":" + this.getQueryMasterPort();
+  }
+
   @Override
   public WorkerConnectionInfoProto getProto() {
     WorkerConnectionInfoProto.Builder builder = 
WorkerConnectionInfoProto.newBuilder();

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainer.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainer.java 
b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainer.java
deleted file mode 100644
index 77562b5..0000000
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainer.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.container;
-
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Stable;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.util.Records;
-
-/**
- * This class is borrowed from the following source code :
- * 
${hadoop-yarn-api}/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
- *
- * <p><code>TajoContainer</code> represents an allocated resource in the 
cluster.
- * </p>
- *
- * <p>The <code>ResourceManager</code> is the sole authority to allocate any
- * <code>TajoContainer</code> to applications. The allocated 
<code>TajoContainer</code>
- * is always on a single node and has a unique {@link 
org.apache.tajo.master.container.TajoContainerId}. It has
- * a specific amount of {@link org.apache.hadoop.yarn.api.records.Resource} 
allocated.</p>
- *
- * <p>It includes details such as:
- *   <ul>
- *     <li>{@link org.apache.tajo.master.container.TajoContainerId} for the 
container, which is globally unique.</li>
- *     <li>
- *       {@link org.apache.hadoop.yarn.api.records.NodeId} of the node on 
which it is allocated.
- *     </li>
- *     <li>HTTP uri of the node.</li>
- *     <li>{@link org.apache.hadoop.yarn.api.records.Resource} allocated to 
the container.</li>
- *     <li>{@link org.apache.hadoop.yarn.api.records.Priority} at which the 
container was allocated.</li>
- *     <li>
- *       TajoContainer {@link org.apache.hadoop.yarn.api.records.Token} of the 
container, used to securely verify
- *       authenticity of the allocation. 
- *     </li>
- *   </ul>
- * </p>
- *
- * <p>Typically, an <code>ApplicationMaster</code> receives the 
- * <code>TajoContainer</code> from the <code>ResourceManager</code> during
- * resource-negotiation and then talks to the <code>NodeManager</code> to 
- * start/stop containers.</p>
- *
- * @see 
ApplicationMasterProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest)
- * @see 
org.apache.hadoop.yarn.api.ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest)
- * @see 
org.apache.hadoop.yarn.api.ContainerManagementProtocol#stopContainers(org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest)
- */
-@Public
-@Stable
-public abstract class TajoContainer implements Comparable<TajoContainer> {
-
-  @Private
-  @Unstable
-  public static TajoContainer newInstance(TajoContainerId containerId, NodeId 
nodeId,
-                                      String nodeHttpAddress, Resource 
resource, Priority priority,
-                                      Token containerToken) {
-    TajoContainer container = Records.newRecord(TajoContainer.class);
-    container.setId(containerId);
-    container.setNodeId(nodeId);
-    container.setNodeHttpAddress(nodeHttpAddress);
-    container.setResource(resource);
-    container.setPriority(priority);
-    container.setContainerToken(containerToken);
-    return container;
-  }
-
-  /**
-   * Get the globally unique identifier for the container.
-   * @return globally unique identifier for the container
-   */
-  @Public
-  @Stable
-  public abstract TajoContainerId getId();
-
-  @Private
-  @Unstable
-  public abstract void setId(TajoContainerId id);
-
-  /**
-   * Get the identifier of the node on which the container is allocated.
-   * @return identifier of the node on which the container is allocated
-   */
-  @Public
-  @Stable
-  public abstract NodeId getNodeId();
-
-  @Private
-  @Unstable
-  public abstract void setNodeId(NodeId nodeId);
-
-  /**
-   * Get the http uri of the node on which the container is allocated.
-   * @return http uri of the node on which the container is allocated
-   */
-  @Public
-  @Stable
-  public abstract String getNodeHttpAddress();
-
-  @Private
-  @Unstable
-  public abstract void setNodeHttpAddress(String nodeHttpAddress);
-
-  /**
-   * Get the <code>Resource</code> allocated to the container.
-   * @return <code>Resource</code> allocated to the container
-   */
-  @Public
-  @Stable
-  public abstract Resource getResource();
-
-  @Private
-  @Unstable
-  public abstract void setResource(Resource resource);
-
-  /**
-   * Get the <code>Priority</code> at which the <code>TajoContainer</code> was
-   * allocated.
-   * @return <code>Priority</code> at which the <code>TajoContainer</code> was
-   *         allocated
-   */
-  @Public
-  @Stable
-  public abstract Priority getPriority();
-
-  @Private
-  @Unstable
-  public abstract void setPriority(Priority priority);
-
-  /**
-   * Get the <code>TajoContainerToken</code> for the container.
-   * <p><code>TajoContainerToken</code> is the security token used by the 
framework
-   * to verify authenticity of any <code>TajoContainer</code>.</p>
-   *
-   * <p>The <code>ResourceManager</code>, on container allocation provides a
-   * secure token which is verified by the <code>NodeManager</code> on
-   * container launch.</p>
-   *
-   * <p>Applications do not need to care about 
<code>TajoContainerToken</code>, they
-   * are transparently handled by the framework - the allocated
-   * <code>TajoContainer</code> includes the 
<code>TajoContainerToken</code>.</p>
-   *
-   * @see 
ApplicationMasterProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest)
-   * @see 
org.apache.hadoop.yarn.api.ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest)
-   *
-   * @return <code>TajoContainerToken</code> for the container
-   */
-  @Public
-  @Stable
-  public abstract Token getContainerToken();
-
-  @Private
-  @Unstable
-  public abstract void setContainerToken(Token containerToken);
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerId.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerId.java 
b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerId.java
deleted file mode 100644
index 7bc27c6..0000000
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerId.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.container;
-
-import java.text.NumberFormat;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Stable;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-
-/**
- * This class is borrowed from the following source code :
- * 
${hadoop-yarn-api}/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java
- *
- * <p><code>TajoContainerId</code> represents a globally unique identifier
- * for a {@link org.apache.tajo.master.container.TajoContainer} in the 
cluster.</p>
- */
-@Public
-@Stable
-public abstract class TajoContainerId implements Comparable<TajoContainerId>{
-
-  @Private
-  @Unstable
-  public static TajoContainerId newInstance(ApplicationAttemptId appAttemptId,
-                                        int containerId) {
-    TajoContainerId id = new TajoContainerIdPBImpl();
-    id.setId(containerId);
-    id.setApplicationAttemptId(appAttemptId);
-    id.build();
-    return id;
-  }
-
-  /**
-   * Get the <code>ApplicationAttemptId</code> of the application to which the
-   * <code>Container</code> was assigned.
-   * <p>
-   * Note: If containers are kept alive across application attempts via
-   * {@link 
org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext#setKeepContainersAcrossApplicationAttempts(boolean)}
-   * the <code>TajoContainerId</code> does not necessarily contain the current
-   * running application attempt's <code>ApplicationAttemptId</code> This
-   * container can be allocated by previously exited application attempt and
-   * managed by the current running attempt thus have the previous application
-   * attempt's <code>ApplicationAttemptId</code>.
-   * </p>
-   *
-   * @return <code>ApplicationAttemptId</code> of the application to which the
-   *         <code>Container</code> was assigned
-   */
-  @Public
-  @Stable
-  public abstract ApplicationAttemptId getApplicationAttemptId();
-
-  @Private
-  @Unstable
-  protected abstract void setApplicationAttemptId(ApplicationAttemptId atId);
-
-  /**
-   * Get the identifier of the <code>TajoContainerId</code>.
-   * @return identifier of the <code>TajoContainerId</code>
-   */
-  @Public
-  @Stable
-  public abstract int getId();
-
-  @Private
-  @Unstable
-  protected abstract void setId(int id);
-
-
-  // TODO: fail the app submission if attempts are more than 10 or something
-  private static final ThreadLocal<NumberFormat> appAttemptIdFormat =
-    new ThreadLocal<NumberFormat>() {
-      @Override
-      public NumberFormat initialValue() {
-        NumberFormat fmt = NumberFormat.getInstance();
-        fmt.setGroupingUsed(false);
-        fmt.setMinimumIntegerDigits(2);
-        return fmt;
-      }
-    };
-  // TODO: Why thread local?
-  // ^ NumberFormat instances are not threadsafe
-  private static final ThreadLocal<NumberFormat> containerIdFormat =
-    new ThreadLocal<NumberFormat>() {
-      @Override
-      public NumberFormat initialValue() {
-        NumberFormat fmt = NumberFormat.getInstance();
-        fmt.setGroupingUsed(false);
-        fmt.setMinimumIntegerDigits(6);
-        return fmt;
-      }
-    };
-
-  @Override
-  public int hashCode() {
-    // Generated by eclipse.
-    final int prime = 435569;
-    int result = 7507;
-    result = prime * result + getId();
-    result = prime * result + getApplicationAttemptId().hashCode();
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    TajoContainerId other = (TajoContainerId) obj;
-    if 
(!this.getApplicationAttemptId().equals(other.getApplicationAttemptId()))
-      return false;
-    if (this.getId() != other.getId())
-      return false;
-    return true;
-  }
-
-  @Override
-  public int compareTo(TajoContainerId other) {
-    if (this.getApplicationAttemptId().compareTo(
-      other.getApplicationAttemptId()) == 0) {
-      return this.getId() - other.getId();
-    } else {
-      return this.getApplicationAttemptId().compareTo(
-        other.getApplicationAttemptId());
-    }
-
-  }
-
-  @Override
-  public String toString() {
-    NumberFormat fmt = NumberFormat.getInstance();
-    fmt.setGroupingUsed(false);
-    fmt.setMinimumIntegerDigits(4);
-
-    StringBuilder sb = new StringBuilder();
-    sb.append("container_");
-    ApplicationId appId = getApplicationAttemptId().getApplicationId();
-    sb.append(appId.getClusterTimestamp()).append("_");
-    sb.append(fmt.format(appId.getId()))
-      .append("_");
-    sb.append(
-      appAttemptIdFormat.get().format(
-        getApplicationAttemptId().getAttemptId())).append("_");
-    sb.append(containerIdFormat.get().format(getId()));
-    return sb.toString();
-  }
-
-  protected abstract void build();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerIdPBImpl.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerIdPBImpl.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerIdPBImpl.java
deleted file mode 100644
index cf9e012..0000000
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoContainerIdPBImpl.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.container;
-
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
-import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
-
-import com.google.common.base.Preconditions;
-import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.master.container.TajoContainerId;
-
-/**
- * This class is borrowed from the following source code :
- * 
${hadoop-yarn-common}/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java
- *
- */
-@Private
-@Unstable
-public class TajoContainerIdPBImpl extends TajoContainerId {
-  ContainerProtocol.TajoContainerIdProto proto = null;
-  ContainerProtocol.TajoContainerIdProto.Builder builder = null;
-  private ApplicationAttemptId applicationAttemptId = null;
-
-  public TajoContainerIdPBImpl() {
-    builder = ContainerProtocol.TajoContainerIdProto.newBuilder();
-  }
-
-  public TajoContainerIdPBImpl(ContainerProtocol.TajoContainerIdProto proto) {
-    this.proto = proto;
-    this.applicationAttemptId = 
convertFromProtoFormat(proto.getAppAttemptId());
-  }
-
-  public ContainerProtocol.TajoContainerIdProto getProto() {
-    return proto;
-  }
-
-  @Override
-  public int getId() {
-    Preconditions.checkNotNull(proto);
-    return proto.getId();
-  }
-
-  @Override
-  protected void setId(int id) {
-    Preconditions.checkNotNull(builder);
-    builder.setId((id));
-  }
-
-
-  @Override
-  public ApplicationAttemptId getApplicationAttemptId() {
-    return this.applicationAttemptId;
-  }
-
-  @Override
-  protected void setApplicationAttemptId(ApplicationAttemptId atId) {
-    if (atId != null) {
-      Preconditions.checkNotNull(builder);
-      builder.setAppAttemptId(convertToProtoFormat(atId));
-    }
-    this.applicationAttemptId = atId;
-  }
-
-  private ApplicationAttemptIdPBImpl convertFromProtoFormat(
-    ApplicationAttemptIdProto p) {
-    return new ApplicationAttemptIdPBImpl(p);
-  }
-
-  private ApplicationAttemptIdProto convertToProtoFormat(
-    ApplicationAttemptId t) {
-    return ((ApplicationAttemptIdPBImpl)t).getProto();
-  }
-
-  @Override
-  protected void build() {
-    proto = builder.build();
-    builder = null;
-  }
-}  
-

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/container/TajoConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoConverterUtils.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/container/TajoConverterUtils.java
deleted file mode 100644
index 88c4823..0000000
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/container/TajoConverterUtils.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.master.container;
-
-
-import static org.apache.hadoop.yarn.util.StringHelper._split;
-
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-
-
-/**
- * This class is borrowed from the following source code :
- * 
${hadoop-yarn-common}/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java
- *
- * This class contains a set of utilities which help converting data structures
- * from/to 'serializableFormat' to/from hadoop/nativejava data structures.
- *
- */
-@Private
-public class TajoConverterUtils {
-
-  public static final String CONTAINER_PREFIX = "container";
-
-  private static ApplicationAttemptId toApplicationAttemptId(
-    Iterator<String> it) throws NumberFormatException {
-    ApplicationId appId = ApplicationId.newInstance(Long.parseLong(it.next()),
-      Integer.parseInt(it.next()));
-    ApplicationAttemptId appAttemptId =
-      ApplicationAttemptId.newInstance(appId, Integer.parseInt(it.next()));
-    return appAttemptId;
-  }
-
-  public static String toString(TajoContainerId cId) {
-    return cId == null ? null : cId.toString();
-  }
-
-  public static TajoContainerId toTajoContainerId(String containerIdStr) {
-    Iterator<String> it = _split(containerIdStr).iterator();
-    if (!it.next().equals(CONTAINER_PREFIX)) {
-      throw new IllegalArgumentException("Invalid TajoContainerId prefix: "
-        + containerIdStr);
-    }
-    try {
-      ApplicationAttemptId appAttemptID = toApplicationAttemptId(it);
-      TajoContainerId containerId =
-        TajoContainerId.newInstance(appAttemptID, Integer.parseInt(it.next()));
-      return containerId;
-    } catch (NumberFormatException n) {
-      throw new IllegalArgumentException("Invalid TajoContainerId: "
-        + containerIdStr, n);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
deleted file mode 100644
index c3a9a59..0000000
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.ExecutionBlockId;
-
-public class ContainerAllocationEvent extends 
AbstractEvent<ContainerAllocatorEventType>  {
-
-  private final ExecutionBlockId executionBlockId;
-  private final Priority priority;
-  private final Resource resource;
-  private final boolean isLeafQuery;
-  private final int requiredNum;
-  private final float progress;
-
-  public ContainerAllocationEvent(ContainerAllocatorEventType eventType,
-                                  ExecutionBlockId executionBlockId,
-                                  Priority priority,
-                                  Resource resource,
-                                  int requiredNum,
-                                  boolean isLeafQuery, float progress) {
-    super(eventType);
-    this.executionBlockId = executionBlockId;
-    this.priority = priority;
-    this.resource = resource;
-    this.requiredNum = requiredNum;
-    this.isLeafQuery = isLeafQuery;
-    this.progress = progress;
-  }
-
-  public ExecutionBlockId getExecutionBlockId() {
-    return executionBlockId;
-  }
-
-  public Priority getPriority() {
-    return priority;
-  }
-
-  public int getRequiredNum() {
-    return requiredNum;
-  }
-
-  public boolean isLeafQuery() {
-    return isLeafQuery;
-  }
-
-  public Resource getCapability() {
-    return resource;
-  }
-
-  public float getProgress() {
-    return progress;
-  }
-
-  public Resource getResource() {
-    return resource;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java
deleted file mode 100644
index 183aeb5..0000000
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-public enum ContainerAllocatorEventType {
-  // producer: TaskAttempt, consumer: ContainerAllocator
-  CONTAINER_REQ,
-  CONTAINER_DEALLOCATE,
-  CONTAINER_FAILED
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerEvent.java 
b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerEvent.java
deleted file mode 100644
index 723ac1a..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerEvent.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.master.event.ContainerEvent.EventType;
-
-public class ContainerEvent extends AbstractEvent<EventType> {
-  public enum EventType {
-    CONTAINER_LAUNCHED,
-    CONTAINER_STOPPED
-  }
-
-  private final ContainerId cId;
-
-  public ContainerEvent(EventType eventType, ContainerId cId) {
-    super(eventType);
-    this.cId = cId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/GrouppedContainerAllocatorEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/event/GrouppedContainerAllocatorEvent.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/event/GrouppedContainerAllocatorEvent.java
deleted file mode 100644
index c34b174..0000000
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/event/GrouppedContainerAllocatorEvent.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tajo.ExecutionBlockId;
-
-import java.util.Map;
-
-public class GrouppedContainerAllocatorEvent
-    extends ContainerAllocationEvent {
-  private final Map<String, Integer> requestMap;
-
-  public GrouppedContainerAllocatorEvent(ContainerAllocatorEventType eventType,
-                                         ExecutionBlockId executionBlockId,
-                                         Priority priority,
-                                         Resource resource,
-                                         Map<String, Integer> requestMap,
-                                         boolean isLeafQuery, float progress) {
-    super(eventType, executionBlockId, priority,
-        resource, requestMap.size(), isLeafQuery, progress);
-    this.requestMap = requestMap;
-  }
-
-  public Map<String, Integer> getRequestMap() {
-    return this.requestMap;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java 
b/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
index 5cf9887..5a36ba9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
@@ -20,27 +20,26 @@ package org.apache.tajo.master.event;
 
 import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.master.container.TajoContainerId;
 
 /**
  * This event is sent to a running TaskAttempt on a worker.
  */
 public class LocalTaskEvent extends AbstractEvent<LocalTaskEventType> {
   private final TaskAttemptId taskAttemptId;
-  private final TajoContainerId containerId;
+  private final int workerId;
 
-  public LocalTaskEvent(TaskAttemptId taskAttemptId, TajoContainerId 
containerId,
+  public LocalTaskEvent(TaskAttemptId taskAttemptId, int workerId,
                         LocalTaskEventType eventType) {
     super(eventType);
     this.taskAttemptId = taskAttemptId;
-    this.containerId = containerId;
+    this.workerId = workerId;
   }
 
   public TaskAttemptId getTaskAttemptId() {
     return taskAttemptId;
   }
 
-  public TajoContainerId getContainerId() {
-    return containerId;
+  public int getWorkerId() {
+    return workerId;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java 
b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
index 3a387fa..9ce7f09 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
@@ -21,6 +21,7 @@ package org.apache.tajo.master.event;
 import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.resource.NodeResource;
 import org.apache.tajo.session.Session;
 
 /**
@@ -36,15 +37,17 @@ public class QueryStartEvent extends AbstractEvent {
   private final QueryContext queryContext;
   private final String jsonExpr;
   private final String logicalPlanJson;
+  private final NodeResource allocation;
 
   public QueryStartEvent(QueryId queryId, Session session, QueryContext 
queryContext, String jsonExpr,
-                         String logicalPlanJson) {
+                         String logicalPlanJson, NodeResource allocation) {
     super(EventType.QUERY_START);
     this.queryId = queryId;
     this.session = session;
     this.queryContext = queryContext;
     this.jsonExpr = jsonExpr;
     this.logicalPlanJson = logicalPlanJson;
+    this.allocation = allocation;
   }
 
   public QueryId getQueryId() {
@@ -67,6 +70,10 @@ public class QueryStartEvent extends AbstractEvent {
     return logicalPlanJson;
   }
 
+  public NodeResource getAllocation() {
+    return allocation;
+  }
+
   @Override
   public String toString() {
     return getClass().getName() + "," + getType() + "," + queryId;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java
deleted file mode 100644
index 0d29e44..0000000
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.master.container.TajoContainer;
-
-import java.util.List;
-
-public class StageContainerAllocationEvent extends StageEvent {
-  private List<TajoContainer> allocatedContainer;
-
-  public StageContainerAllocationEvent(final ExecutionBlockId id,
-                                       List<TajoContainer> allocatedContainer) 
{
-    super(id, StageEventType.SQ_CONTAINER_ALLOCATED);
-    this.allocatedContainer = allocatedContainer;
-  }
-
-  public List<TajoContainer> getAllocatedContainer() {
-    return this.allocatedContainer;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java 
b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
index 763d426..d9beaa8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
@@ -26,9 +26,7 @@ public enum StageEventType {
   // Producer: Query
   SQ_INIT,
   SQ_START,
-  SQ_CONTAINER_ALLOCATED,
   SQ_KILL,
-  SQ_LAUNCH,
 
   // Producer: Task
   SQ_TASK_COMPLETED,

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java
index 924fb59..8a3dcb0 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java
@@ -19,20 +19,20 @@
 package org.apache.tajo.master.event;
 
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ResourceProtos.ExecutionBlockReport;
 
 /**
  * Event Class: From {@link 
org.apache.tajo.querymaster.QueryMasterManagerService} to Stage
  */
 public class StageShuffleReportEvent extends StageEvent {
-  private TajoWorkerProtocol.ExecutionBlockReport report;
+  private ExecutionBlockReport report;
 
-  public StageShuffleReportEvent(ExecutionBlockId executionBlockId, 
TajoWorkerProtocol.ExecutionBlockReport report) {
+  public StageShuffleReportEvent(ExecutionBlockId executionBlockId, 
ExecutionBlockReport report) {
     super(executionBlockId, StageEventType.SQ_SHUFFLE_REPORT);
     this.report = report;
   }
 
-  public TajoWorkerProtocol.ExecutionBlockReport getReport() {
+  public ExecutionBlockReport getReport() {
     return report;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
index 1611370..08ef805 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
@@ -20,23 +20,16 @@ package org.apache.tajo.master.event;
 
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.container.TajoContainerId;
 
 public class TaskAttemptAssignedEvent extends TaskAttemptEvent {
-  private final TajoContainerId cId;
   private final WorkerConnectionInfo workerConnectionInfo;
 
-  public TaskAttemptAssignedEvent(TaskAttemptId id, TajoContainerId cId,
+  public TaskAttemptAssignedEvent(TaskAttemptId id,
                                   WorkerConnectionInfo connectionInfo) {
     super(id, TaskAttemptEventType.TA_ASSIGNED);
-    this.cId = cId;
     this.workerConnectionInfo = connectionInfo;
   }
 
-  public TajoContainerId getContainerId() {
-    return cId;
-  }
-
   public WorkerConnectionInfo getWorkerConnectionInfo(){
     return workerConnectionInfo;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
index e35b154..f59b50b 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
@@ -33,6 +33,7 @@ public enum TaskAttemptEventType {
 
   //Producer:Scheduler
   TA_ASSIGNED,
+  TA_ASSIGN_CANCEL,
   TA_SCHEDULE_CANCELED,
 
   //Producer:Scheduler

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
index 8c5f016..a9af288 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.master.event;
 
 import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.ipc.TajoWorkerProtocol.TaskStatusProto;
+import org.apache.tajo.ResourceProtos.TaskStatusProto;
 
 public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent {
   private final TaskStatusProto status;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java
index 5a016fb..6799ce1 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java
@@ -20,9 +20,8 @@ package org.apache.tajo.master.event;
 
 import com.google.protobuf.RpcCallback;
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ResourceProtos.TaskRequestProto;
 import org.apache.tajo.querymaster.TaskAttempt;
-import org.apache.tajo.master.container.TajoContainerId;
 
 public class TaskAttemptToSchedulerEvent extends TaskSchedulerEvent {
   private final TaskAttemptScheduleContext context;
@@ -44,30 +43,13 @@ public class TaskAttemptToSchedulerEvent extends 
TaskSchedulerEvent {
   }
 
   public static class TaskAttemptScheduleContext {
-    private TajoContainerId containerId;
     private String host;
-    private RpcCallback<TajoWorkerProtocol.TaskRequestProto> callback;
+    private RpcCallback<TaskRequestProto> callback;
 
     public TaskAttemptScheduleContext() {
 
     }
 
-    public TaskAttemptScheduleContext(TajoContainerId containerId,
-                                      String host,
-                                      
RpcCallback<TajoWorkerProtocol.TaskRequestProto> callback) {
-      this.containerId = containerId;
-      this.host = host;
-      this.callback = callback;
-    }
-
-    public TajoContainerId getContainerId() {
-      return containerId;
-    }
-
-    public void setContainerId(TajoContainerId containerId) {
-      this.containerId = containerId;
-    }
-
     public String getHost() {
       return host;
     }
@@ -76,11 +58,11 @@ public class TaskAttemptToSchedulerEvent extends 
TaskSchedulerEvent {
       this.host = host;
     }
 
-    public RpcCallback<TajoWorkerProtocol.TaskRequestProto> getCallback() {
+    public RpcCallback<TaskRequestProto> getCallback() {
       return callback;
     }
 
-    public void setCallback(RpcCallback<TajoWorkerProtocol.TaskRequestProto> 
callback) {
+    public void setCallback(RpcCallback<TaskRequestProto> callback) {
       this.callback = callback;
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java 
b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
index 20204aa..66275b1 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.master.event;
 
 import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
+import org.apache.tajo.ResourceProtos.TaskCompletionReport;
 
 public class TaskCompletionEvent extends TaskAttemptEvent {
   private TaskCompletionReport report;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java 
b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
index 03888bd..d50fcb8 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.master.event;
 
 import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.ipc.TajoWorkerProtocol.TaskFatalErrorReport;
+import org.apache.tajo.ResourceProtos.TaskFatalErrorReport;
 
 public class TaskFatalErrorEvent extends TaskAttemptEvent {
   private final String message;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java 
b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
index 3f72ed9..495eaf2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
@@ -18,13 +18,10 @@
 
 package org.apache.tajo.master.event;
 
-import com.google.protobuf.RpcCallback;
 import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProto;
+import org.apache.tajo.ResourceProtos.AllocationResourceProto;
 import org.apache.tajo.master.event.TaskRequestEvent.TaskRequestEventType;
-import org.apache.tajo.master.container.TajoContainerId;
 
 public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> {
 
@@ -32,36 +29,28 @@ public class TaskRequestEvent extends 
AbstractEvent<TaskRequestEventType> {
     TASK_REQ
   }
 
-  private final int workerId;
-  private final TajoContainerId containerId;
+  private final AllocationResourceProto responseProto;
   private final ExecutionBlockId executionBlockId;
-
-  private final RpcCallback<TaskRequestProto> callback;
+  private final int workerId;
 
   public TaskRequestEvent(int workerId,
-                          TajoContainerId containerId,
-                          ExecutionBlockId executionBlockId,
-                          RpcCallback<TaskRequestProto> callback) {
+                          AllocationResourceProto responseProto,
+                          ExecutionBlockId executionBlockId) {
     super(TaskRequestEventType.TASK_REQ);
     this.workerId = workerId;
-    this.containerId = containerId;
+    this.responseProto = responseProto;
     this.executionBlockId = executionBlockId;
-    this.callback = callback;
-  }
-
-  public int getWorkerId() {
-    return this.workerId;
-  }
-
-  public TajoContainerId getContainerId() {
-    return this.containerId;
   }
 
   public ExecutionBlockId getExecutionBlockId() {
     return executionBlockId;
   }
 
-  public RpcCallback<TajoWorkerProtocol.TaskRequestProto> getCallback() {
-    return this.callback;
+  public AllocationResourceProto getResponseProto() {
+    return responseProto;
+  }
+
+  public int getWorkerId() {
+    return workerId;
   }
 }

Reply via email to