http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
deleted file mode 100644
index 90a4eb5..0000000
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ /dev/null
@@ -1,605 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.rm;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.protobuf.RpcCallback;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
-import org.apache.tajo.master.QueryInProgress;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.rpc.CancelableRpcCallback;
-import org.apache.tajo.rpc.RpcUtils;
-import org.apache.tajo.util.ApplicationIdUtils;
-import org.apache.tajo.util.BasicFuture;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
-/**
- * It manages all resources of tajo workers.
- */
-public class TajoWorkerResourceManager extends CompositeService implements 
WorkerResourceManager {
-  /** class logger */
-  private static final Log LOG = 
LogFactory.getLog(TajoWorkerResourceManager.class);
-
-  static AtomicInteger containerIdSeq = new AtomicInteger(0);
-
-  private TajoMaster.MasterContext masterContext;
-
-  private TajoRMContext rmContext;
-
-  private String queryIdSeed;
-
-  private WorkerResourceAllocationThread workerResourceAllocator;
-
-  /**
-   * Worker Liveliness monitor
-   */
-  private WorkerLivelinessMonitor workerLivelinessMonitor;
-
-  private final BlockingQueue<WorkerResourceRequest> requestQueue =
-      new LinkedBlockingDeque<WorkerResourceRequest>();
-  private final RpcUtils.Scrutineer<BasicFuture<ClusterResourceSummary>> 
summaryRequest =
-      new RpcUtils.Scrutineer<BasicFuture<ClusterResourceSummary>>();
-
-  private AtomicBoolean stopped = new AtomicBoolean(false);
-
-  private TajoConf systemConf;
-
-  private ConcurrentMap<ContainerProtocol.TajoContainerIdProto, 
AllocatedWorkerResource> allocatedResourceMap = Maps
-    .newConcurrentMap();
-
-  /** It receives status messages from workers and their resources. */
-  private TajoResourceTracker resourceTracker;
-
-  public TajoWorkerResourceManager(TajoMaster.MasterContext masterContext) {
-    super(TajoWorkerResourceManager.class.getSimpleName());
-    this.masterContext = masterContext;
-  }
-
-  public TajoWorkerResourceManager(TajoConf systemConf) {
-    super(TajoWorkerResourceManager.class.getSimpleName());
-  }
-
-  @Override
-  public void serviceInit(Configuration conf) throws Exception {
-    Preconditions.checkArgument(conf instanceof TajoConf);
-    this.systemConf = (TajoConf) conf;
-
-    AsyncDispatcher dispatcher = new AsyncDispatcher();
-    addIfService(dispatcher);
-
-    rmContext = new TajoRMContext(dispatcher);
-
-    this.queryIdSeed = String.valueOf(System.currentTimeMillis());
-
-    workerResourceAllocator = new WorkerResourceAllocationThread();
-    workerResourceAllocator.start();
-
-    this.workerLivelinessMonitor = new 
WorkerLivelinessMonitor(this.rmContext.getDispatcher());
-    addIfService(this.workerLivelinessMonitor);
-
-    // Register event handler for Workers
-    rmContext.getDispatcher().register(WorkerEventType.class, new 
WorkerEventDispatcher(rmContext));
-
-    resourceTracker = new TajoResourceTracker(this, workerLivelinessMonitor);
-    addIfService(resourceTracker);
-
-    super.serviceInit(systemConf);
-  }
-
-  @InterfaceAudience.Private
-  public static final class WorkerEventDispatcher implements 
EventHandler<WorkerEvent> {
-
-    private final TajoRMContext rmContext;
-
-    public WorkerEventDispatcher(TajoRMContext rmContext) {
-      this.rmContext = rmContext;
-    }
-
-    @Override
-    public void handle(WorkerEvent event) {
-      int workerId = event.getWorkerId();
-      Worker node = this.rmContext.getWorkers().get(workerId);
-      if (node != null) {
-        try {
-          node.handle(event);
-        } catch (Throwable t) {
-          LOG.error("Error in handling event type " + event.getType() + " for 
node " + workerId, t);
-        }
-      }
-    }
-  }
-
-  @Override
-  public Map<Integer, Worker> getWorkers() {
-    return ImmutableMap.copyOf(rmContext.getWorkers());
-  }
-
-  @Override
-  public Map<Integer, Worker> getInactiveWorkers() {
-    return ImmutableMap.copyOf(rmContext.getInactiveWorkers());
-  }
-
-  public Collection<Integer> getQueryMasters() {
-    return Collections.unmodifiableSet(rmContext.getQueryMasterWorker());
-  }
-
-  @Override
-  public void serviceStop() throws Exception {
-    if(stopped.get()) {
-      return;
-    }
-    stopped.set(true);
-    if(workerResourceAllocator != null) {
-      workerResourceAllocator.interrupt();
-    }
-
-    super.serviceStop();
-  }
-
-  /**
-   *
-   * @return The prefix of queryId. It is generated when a TajoMaster starts 
up.
-   */
-  @Override
-  public String getSeedQueryId() throws IOException {
-    return queryIdSeed;
-  }
-
-  @VisibleForTesting
-  TajoResourceTracker getResourceTracker() {
-    return resourceTracker;
-  }
-
-  private WorkerResourceAllocationRequest createQMResourceRequest(QueryId 
queryId) {
-    float queryMasterDefaultDiskSlot = masterContext.getConf().getFloatVar(
-      TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT);
-    int queryMasterDefaultMemoryMB = 
masterContext.getConf().getIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB);
-
-    WorkerResourceAllocationRequest.Builder builder = 
WorkerResourceAllocationRequest.newBuilder();
-    builder.setQueryId(queryId.getProto());
-    builder.setMaxMemoryMBPerContainer(queryMasterDefaultMemoryMB);
-    builder.setMinMemoryMBPerContainer(queryMasterDefaultMemoryMB);
-    builder.setMaxDiskSlotPerContainer(queryMasterDefaultDiskSlot);
-    builder.setMinDiskSlotPerContainer(queryMasterDefaultDiskSlot);
-    builder.setResourceRequestPriority(ResourceRequestPriority.MEMORY);
-    builder.setNumContainers(1);
-    return builder.build();
-  }
-
-  @Override
-  public WorkerAllocatedResource allocateQueryMaster(QueryInProgress 
queryInProgress) {
-
-    // 3 seconds, by default
-    long timeout = masterContext.getConf().getTimeVar(
-        TajoConf.ConfVars.TAJO_QUERYMASTER_ALLOCATION_TIMEOUT, 
TimeUnit.MILLISECONDS);
-
-    // Create a resource request for a query master
-    WorkerResourceAllocationRequest qmResourceRequest = 
createQMResourceRequest(queryInProgress.getQueryId());
-
-    // call future for async call
-    final CancelableRpcCallback<WorkerResourceAllocationResponse> callFuture =
-        new CancelableRpcCallback<WorkerResourceAllocationResponse>() {
-          @Override
-          protected void cancel(WorkerResourceAllocationResponse canceled) {
-            if (canceled != null && 
!canceled.getWorkerAllocatedResourceList().isEmpty()) {
-              LOG.info("Canceling resources allocated");
-              WorkerAllocatedResource resource = 
canceled.getWorkerAllocatedResource(0);
-              releaseWorkerResource(resource.getContainerId());
-            }
-          }
-        };
-    allocateWorkerResources(qmResourceRequest, callFuture);
-
-    WorkerResourceAllocationResponse response = null;
-    try {
-      response = callFuture.get(timeout, TimeUnit.MILLISECONDS);
-    } catch (Throwable t) {
-      response = callFuture.cancel(); // try cancel
-      if (response == null) {
-        // canceled successfuly
-        LOG.warn("Got exception waiting resources for query master " + 
queryInProgress.getQueryId(), t);
-        return null;
-      }
-    }
-
-    if (response == null || response.getWorkerAllocatedResourceList().size() 
== 0) {
-      return null;
-    }
-
-    WorkerAllocatedResource resource = response.getWorkerAllocatedResource(0);
-    registerQueryMaster(queryInProgress.getQueryId(), 
resource.getContainerId());
-    return resource;
-  }
-
-  private void registerQueryMaster(QueryId queryId, 
ContainerProtocol.TajoContainerIdProto containerId) {
-    rmContext.getQueryMasterContainer().putIfAbsent(queryId, containerId);
-  }
-
-  @Override
-  public void allocateWorkerResources(WorkerResourceAllocationRequest request,
-                                      
RpcCallback<WorkerResourceAllocationResponse> callBack) {
-    try {
-      //TODO checking queue size
-      requestQueue.put(new WorkerResourceRequest(new 
QueryId(request.getQueryId()), false, request, callBack));
-    } catch (InterruptedException e) {
-      LOG.error(e.getMessage(), e);
-    }
-  }
-
-  static class WorkerResourceRequest {
-    boolean queryMasterRequest;
-    QueryId queryId;
-    WorkerResourceAllocationRequest request;
-    RpcCallback<WorkerResourceAllocationResponse> callBack;
-    WorkerResourceRequest(
-      QueryId queryId,
-      boolean queryMasterRequest, WorkerResourceAllocationRequest request,
-      RpcCallback<WorkerResourceAllocationResponse> callBack) {
-      this.queryId = queryId;
-      this.queryMasterRequest = queryMasterRequest;
-      this.request = request;
-      this.callBack = callBack;
-    }
-  }
-
-  static class AllocatedWorkerResource {
-    Worker worker;
-    int allocatedMemoryMB;
-    float allocatedDiskSlots;
-  }
-
-  private static final long QUEUE_POLLING_TIME = 100;
-
-  class WorkerResourceAllocationThread extends Thread {
-    @Override
-    public void run() {
-      LOG.info("WorkerResourceAllocationThread start");
-      while(!stopped.get()) {
-        BasicFuture<ClusterResourceSummary> future = summaryRequest.expire();
-        if (future != null) {
-          future.done(makeClusterResourceSummary());
-        }
-        try {
-          WorkerResourceRequest resourceRequest = requestQueue.poll(
-              QUEUE_POLLING_TIME, TimeUnit.MILLISECONDS);
-          if (resourceRequest == null) {
-            continue;
-          }
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("allocateWorkerResources:" +
-              (new QueryId(resourceRequest.request.getQueryId())) +
-              ", requiredMemory:" + 
resourceRequest.request.getMinMemoryMBPerContainer() +
-              "~" + resourceRequest.request.getMaxMemoryMBPerContainer() +
-              ", requiredContainers:" + 
resourceRequest.request.getNumContainers() +
-              ", requiredDiskSlots:" + 
resourceRequest.request.getMinDiskSlotPerContainer() +
-              "~" + resourceRequest.request.getMaxDiskSlotPerContainer() +
-              ", queryMasterRequest=" + resourceRequest.queryMasterRequest +
-              ", liveWorkers=" + rmContext.getWorkers().size());
-          }
-
-          // TajoWorkerResourceManager can't return allocated disk slots 
occasionally.
-          // Because the rest resource request can remains after QueryMaster 
stops.
-          // Thus we need to find whether QueryId stopped or not.
-          if 
(!rmContext.getStoppedQueryIds().contains(resourceRequest.queryId)) {
-            List<AllocatedWorkerResource> allocatedWorkerResources = 
chooseWorkers(resourceRequest);
-
-            if(allocatedWorkerResources.size() > 0) {
-              List<WorkerAllocatedResource> allocatedResources =
-                new ArrayList<WorkerAllocatedResource>();
-
-              for(AllocatedWorkerResource allocatedResource: 
allocatedWorkerResources) {
-                NodeId nodeId = 
NodeId.newInstance(allocatedResource.worker.getConnectionInfo().getHost(),
-                  
allocatedResource.worker.getConnectionInfo().getPeerRpcPort());
-
-                TajoWorkerContainerId containerId = new 
TajoWorkerContainerId();
-
-                containerId.setApplicationAttemptId(
-                  
ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId));
-                containerId.setId(containerIdSeq.incrementAndGet());
-
-                ContainerProtocol.TajoContainerIdProto containerIdProto = 
containerId.getProto();
-                allocatedResources.add(WorkerAllocatedResource.newBuilder()
-                  .setContainerId(containerIdProto)
-                  
.setConnectionInfo(allocatedResource.worker.getConnectionInfo().getProto())
-                  .setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB)
-                  .setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots)
-                  .build());
-
-
-                allocatedResourceMap.putIfAbsent(containerIdProto, 
allocatedResource);
-              }
-
-              
resourceRequest.callBack.run(WorkerResourceAllocationResponse.newBuilder()
-                  .setQueryId(resourceRequest.request.getQueryId())
-                  .addAllWorkerAllocatedResource(allocatedResources)
-                  .build()
-              );
-
-            } else {
-              if(LOG.isDebugEnabled()) {
-                LOG.debug("=========================================");
-                LOG.debug("Available Workers");
-                for(Worker worker: rmContext.getWorkers().values()) {
-                  LOG.debug(worker.toString());
-                }
-                LOG.debug("=========================================");
-              }
-              requestQueue.put(resourceRequest);
-              Thread.sleep(QUEUE_POLLING_TIME);
-            }
-          }
-        } catch(InterruptedException ie) {
-          LOG.error(ie);
-        } catch (Throwable t) {
-          LOG.error(t, t);
-        }
-      }
-    }
-  }
-
-  private static final long MAX_WAIT_TIME = 10000;
-
-  public ClusterResourceSummary getClusterResourceSummary() {
-    BasicFuture<ClusterResourceSummary> future =
-        summaryRequest.check(new BasicFuture<ClusterResourceSummary>());
-    try {
-      return future.get(MAX_WAIT_TIME, TimeUnit.MILLISECONDS);
-    } catch (Exception e) {
-      LOG.warn("Failed to get cluster summary by exception", e);
-    }
-    return null;
-  }
-
-  private ClusterResourceSummary makeClusterResourceSummary() {
-
-    int totalDiskSlots = 0;
-    int totalCpuCoreSlots = 0;
-    int totalMemoryMB = 0;
-
-    int totalAvailableDiskSlots = 0;
-    int totalAvailableCpuCoreSlots = 0;
-    int totalAvailableMemoryMB = 0;
-
-    for(Worker worker: rmContext.getWorkers().values()) {
-
-      WorkerResource resource = worker.getResource();
-
-      totalMemoryMB += resource.getMemoryMB();
-      totalAvailableMemoryMB += resource.getAvailableMemoryMB();
-
-      totalDiskSlots += resource.getDiskSlots();
-      totalAvailableDiskSlots += resource.getAvailableDiskSlots();
-
-      totalCpuCoreSlots += resource.getCpuCoreSlots();
-      totalAvailableCpuCoreSlots += resource.getAvailableCpuCoreSlots();
-    }
-
-    return ClusterResourceSummary.newBuilder()
-        .setNumWorkers(rmContext.getWorkers().size())
-        .setTotalCpuCoreSlots(totalCpuCoreSlots)
-        .setTotalDiskSlots(totalDiskSlots)
-        .setTotalMemoryMB(totalMemoryMB)
-        .setTotalAvailableCpuCoreSlots(totalAvailableCpuCoreSlots)
-        .setTotalAvailableDiskSlots(totalAvailableDiskSlots)
-        .setTotalAvailableMemoryMB(totalAvailableMemoryMB)
-        .build();
-  }
-
-  private List<AllocatedWorkerResource> chooseWorkers(WorkerResourceRequest 
resourceRequest) {
-    List<AllocatedWorkerResource> selectedWorkers = new 
ArrayList<AllocatedWorkerResource>();
-
-    int allocatedResources = 0;
-
-    ResourceRequestPriority resourceRequestPriority
-      = resourceRequest.request.getResourceRequestPriority();
-
-    List<Worker> randomWorkers = new 
ArrayList<Worker>(rmContext.getWorkers().values());
-    Collections.shuffle(randomWorkers);
-
-    if(resourceRequestPriority == ResourceRequestPriority.MEMORY) {
-
-      int numContainers = resourceRequest.request.getNumContainers();
-      int minMemoryMB = resourceRequest.request.getMinMemoryMBPerContainer();
-      int maxMemoryMB = resourceRequest.request.getMaxMemoryMBPerContainer();
-      float diskSlot = 
Math.max(resourceRequest.request.getMaxDiskSlotPerContainer(),
-          resourceRequest.request.getMinDiskSlotPerContainer());
-
-      int liveWorkerSize = randomWorkers.size();
-      Set<Integer> insufficientWorkers = new HashSet<Integer>();
-      boolean stop = false;
-      boolean checkMax = true;
-      while(!stop) {
-        if(allocatedResources >= numContainers) {
-          break;
-        }
-
-        if(insufficientWorkers.size() >= liveWorkerSize) {
-          if(!checkMax) {
-            break;
-          }
-          insufficientWorkers.clear();
-          checkMax = false;
-        }
-        int compareAvailableMemory = checkMax ? maxMemoryMB : minMemoryMB;
-
-        for(Worker worker: randomWorkers) {
-          if(allocatedResources >= numContainers) {
-            stop = true;
-            break;
-          }
-
-          if(insufficientWorkers.size() >= liveWorkerSize) {
-            break;
-          }
-
-          WorkerResource workerResource = worker.getResource();
-          if(workerResource.getAvailableMemoryMB() >= compareAvailableMemory) {
-            int workerMemory;
-            if(workerResource.getAvailableMemoryMB() >= maxMemoryMB) {
-              workerMemory = maxMemoryMB;
-            } else {
-              workerMemory = workerResource.getAvailableMemoryMB();
-            }
-            AllocatedWorkerResource allocatedWorkerResource = new 
AllocatedWorkerResource();
-            allocatedWorkerResource.worker = worker;
-            allocatedWorkerResource.allocatedMemoryMB = workerMemory;
-            if(workerResource.getAvailableDiskSlots() >= diskSlot) {
-              allocatedWorkerResource.allocatedDiskSlots = diskSlot;
-            } else {
-              allocatedWorkerResource.allocatedDiskSlots = 
workerResource.getAvailableDiskSlots();
-            }
-
-            
workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
-                allocatedWorkerResource.allocatedMemoryMB);
-
-            selectedWorkers.add(allocatedWorkerResource);
-
-            allocatedResources++;
-          } else {
-            insufficientWorkers.add(worker.getWorkerId());
-          }
-        }
-      }
-    } else {
-      int numContainers = resourceRequest.request.getNumContainers();
-      float minDiskSlots = 
resourceRequest.request.getMinDiskSlotPerContainer();
-      float maxDiskSlots = 
resourceRequest.request.getMaxDiskSlotPerContainer();
-      int memoryMB = 
Math.max(resourceRequest.request.getMaxMemoryMBPerContainer(),
-          resourceRequest.request.getMinMemoryMBPerContainer());
-
-      int liveWorkerSize = randomWorkers.size();
-      Set<Integer> insufficientWorkers = new HashSet<Integer>();
-      boolean stop = false;
-      boolean checkMax = true;
-      while(!stop) {
-        if(allocatedResources >= numContainers) {
-          break;
-        }
-
-        if(insufficientWorkers.size() >= liveWorkerSize) {
-          if(!checkMax) {
-            break;
-          }
-          insufficientWorkers.clear();
-          checkMax = false;
-        }
-        float compareAvailableDisk = checkMax ? maxDiskSlots : minDiskSlots;
-
-        for(Worker worker: randomWorkers) {
-          if(allocatedResources >= numContainers) {
-            stop = true;
-            break;
-          }
-
-          if(insufficientWorkers.size() >= liveWorkerSize) {
-            break;
-          }
-
-          WorkerResource workerResource = worker.getResource();
-          if(workerResource.getAvailableDiskSlots() >= compareAvailableDisk) {
-            float workerDiskSlots;
-            if(workerResource.getAvailableDiskSlots() >= maxDiskSlots) {
-              workerDiskSlots = maxDiskSlots;
-            } else {
-              workerDiskSlots = workerResource.getAvailableDiskSlots();
-            }
-            AllocatedWorkerResource allocatedWorkerResource = new 
AllocatedWorkerResource();
-            allocatedWorkerResource.worker = worker;
-            allocatedWorkerResource.allocatedDiskSlots = workerDiskSlots;
-
-            if(workerResource.getAvailableMemoryMB() >= memoryMB) {
-              allocatedWorkerResource.allocatedMemoryMB = memoryMB;
-            } else {
-              allocatedWorkerResource.allocatedMemoryMB = 
workerResource.getAvailableMemoryMB();
-            }
-            
workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
-                allocatedWorkerResource.allocatedMemoryMB);
-
-            selectedWorkers.add(allocatedWorkerResource);
-
-            allocatedResources++;
-          } else {
-            insufficientWorkers.add(worker.getWorkerId());
-          }
-        }
-      }
-    }
-    return selectedWorkers;
-  }
-
-  /**
-   * Release allocated resource.
-   *
-   * @param containerId ContainerIdProto to be released
-   */
-  @Override
-  public void releaseWorkerResource(ContainerProtocol.TajoContainerIdProto 
containerId) {
-    AllocatedWorkerResource allocated = 
allocatedResourceMap.remove(containerId);
-    if(allocated != null) {
-      LOG.info("Release Resource: " + allocated.allocatedDiskSlots + "," + 
allocated.allocatedMemoryMB);
-      allocated.worker.getResource().releaseResource( 
allocated.allocatedDiskSlots, allocated.allocatedMemoryMB);
-    } else {
-      LOG.warn("No AllocatedWorkerResource data for [" + containerId + "]");
-      return;
-    }
-  }
-
-  @Override
-  public boolean isQueryMasterStopped(QueryId queryId) {
-    return !rmContext.getQueryMasterContainer().containsKey(queryId);
-  }
-
-  @Override
-  public void releaseQueryMaster(QueryId queryId) {
-    if(!rmContext.getQueryMasterContainer().containsKey(queryId)) {
-      LOG.warn("No QueryMaster resource info for " + queryId);
-      return;
-    } else {
-      ContainerProtocol.TajoContainerIdProto containerId = 
rmContext.getQueryMasterContainer().remove(queryId);
-      releaseWorkerResource(containerId);
-      rmContext.getStoppedQueryIds().add(queryId);
-      LOG.info(String.format("Released QueryMaster (%s) resource." , 
queryId.toString()));
-    }
-  }
-
-  public TajoRMContext getRMContext() {
-    return rmContext;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java 
b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
deleted file mode 100644
index 6535688..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.rm;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.state.*;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-
-import java.util.EnumSet;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * It contains resource and various information for a worker.
- */
-public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> {
-  /** class logger */
-  private static final Log LOG = LogFactory.getLog(Worker.class);
-
-  private final ReentrantReadWriteLock.ReadLock readLock;
-  private final ReentrantReadWriteLock.WriteLock writeLock;
-
-  /** context of {@link org.apache.tajo.master.rm.TajoWorkerResourceManager} */
-  private final TajoRMContext rmContext;
-
-  /** last heartbeat time */
-  private long lastHeartbeatTime;
-
-  /** Resource capability */
-  private final WorkerResource resource;
-
-  /** Worker connection information */
-  private WorkerConnectionInfo connectionInfo;
-
-  private static final ReconnectNodeTransition RECONNECT_NODE_TRANSITION = new 
ReconnectNodeTransition();
-  private static final StatusUpdateTransition STATUS_UPDATE_TRANSITION = new 
StatusUpdateTransition();
-
-  private static final StateMachineFactory<Worker,
-      WorkerState,
-      WorkerEventType,
-      WorkerEvent> stateMachineFactory
-      = new StateMachineFactory<Worker,
-      WorkerState,
-      WorkerEventType,
-      WorkerEvent>(WorkerState.NEW)
-
-      // Transition from NEW
-      .addTransition(WorkerState.NEW, WorkerState.RUNNING,
-          WorkerEventType.STARTED,
-          new AddNodeTransition())
-
-      // Transition from RUNNING
-      .addTransition(WorkerState.RUNNING, EnumSet.of(WorkerState.RUNNING, 
WorkerState.UNHEALTHY),
-          WorkerEventType.STATE_UPDATE,
-          STATUS_UPDATE_TRANSITION)
-      .addTransition(WorkerState.RUNNING, WorkerState.LOST,
-          WorkerEventType.EXPIRE,
-          new DeactivateNodeTransition(WorkerState.LOST))
-      .addTransition(WorkerState.RUNNING, WorkerState.RUNNING,
-          WorkerEventType.RECONNECTED,
-          RECONNECT_NODE_TRANSITION)
-
-      // Transitions from UNHEALTHY state
-      .addTransition(WorkerState.UNHEALTHY, EnumSet.of(WorkerState.RUNNING, 
WorkerState.UNHEALTHY),
-          WorkerEventType.STATE_UPDATE,
-          STATUS_UPDATE_TRANSITION)
-      .addTransition(WorkerState.UNHEALTHY, WorkerState.LOST,
-          WorkerEventType.EXPIRE,
-          new DeactivateNodeTransition(WorkerState.LOST))
-      .addTransition(WorkerState.UNHEALTHY, WorkerState.UNHEALTHY,
-          WorkerEventType.RECONNECTED,
-          RECONNECT_NODE_TRANSITION);
-
-  private final StateMachine<WorkerState, WorkerEventType, WorkerEvent> 
stateMachine =
-      stateMachineFactory.make(this, WorkerState.NEW);
-
-  public Worker(TajoRMContext rmContext, WorkerResource resource, 
WorkerConnectionInfo connectionInfo) {
-    this.rmContext = rmContext;
-
-    this.connectionInfo = connectionInfo;
-    this.lastHeartbeatTime = System.currentTimeMillis();
-    this.resource = resource;
-
-    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-    this.readLock = lock.readLock();
-    this.writeLock = lock.writeLock();
-  }
-
-  public int getWorkerId() {
-    return connectionInfo.getId();
-  }
-
-  public WorkerConnectionInfo getConnectionInfo() {
-    return connectionInfo;
-  }
-
-  public void setLastHeartbeatTime(long lastheartbeatReportTime) {
-    this.writeLock.lock();
-
-    try {
-      this.lastHeartbeatTime = lastheartbeatReportTime;
-    } finally {
-      this.writeLock.unlock();
-    }
-  }
-
-  public long getLastHeartbeatTime() {
-    this.readLock.lock();
-
-    try {
-      return this.lastHeartbeatTime;
-    } finally {
-      this.readLock.unlock();
-    }
-  }
-
-  /**
-   *
-   * @return the current state of worker
-   */
-  public WorkerState getState() {
-    this.readLock.lock();
-
-    try {
-      return this.stateMachine.getCurrentState();
-    } finally {
-      this.readLock.unlock();
-    }
-  }
-
-  /**
-   *
-   * @return the current resource capability of worker
-   */
-  public WorkerResource getResource() {
-    return this.resource;
-  }
-
-  @Override
-  public int compareTo(Worker o) {
-    if(o == null) {
-      return 1;
-    }
-    return connectionInfo.compareTo(o.connectionInfo);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    Worker worker = (Worker) o;
-
-    if (lastHeartbeatTime != worker.lastHeartbeatTime) return false;
-    if (connectionInfo != null ? !connectionInfo.equals(worker.connectionInfo) 
: worker.connectionInfo != null)
-      return false;
-    if (readLock != null ? !readLock.equals(worker.readLock) : worker.readLock 
!= null) return false;
-    if (resource != null ? !resource.equals(worker.resource) : worker.resource 
!= null) return false;
-    if (rmContext != null ? !rmContext.equals(worker.rmContext) : 
worker.rmContext != null) return false;
-    if (stateMachine != null ? !stateMachine.equals(worker.stateMachine) : 
worker.stateMachine != null) return false;
-    if (writeLock != null ? !writeLock.equals(worker.writeLock) : 
worker.writeLock != null) return false;
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    int result = readLock != null ? readLock.hashCode() : 0;
-    result = 31 * result + (writeLock != null ? writeLock.hashCode() : 0);
-    result = 31 * result + (rmContext != null ? rmContext.hashCode() : 0);
-    result = 31 * result + (int) (lastHeartbeatTime ^ (lastHeartbeatTime >>> 
32));
-    result = 31 * result + (resource != null ? resource.hashCode() : 0);
-    result = 31 * result + (connectionInfo != null ? connectionInfo.hashCode() 
: 0);
-    result = 31 * result + (stateMachine != null ? stateMachine.hashCode() : 
0);
-    return result;
-  }
-
-  public static class AddNodeTransition implements SingleArcTransition<Worker, 
WorkerEvent> {
-    @Override
-    public void transition(Worker worker, WorkerEvent workerEvent) {
-
-      worker.rmContext.getQueryMasterWorker().add(worker.getWorkerId());
-      LOG.info("Worker with " + worker.getResource() + " is joined to Tajo 
cluster");
-    }
-  }
-
-  public static class StatusUpdateTransition implements
-      MultipleArcTransition<Worker, WorkerEvent, WorkerState> {
-
-    @Override
-    public WorkerState transition(Worker worker, WorkerEvent event) {
-      if (!(event instanceof WorkerStatusEvent)) {
-        throw new IllegalArgumentException("event should be a 
WorkerStatusEvent type.");
-      }
-      WorkerStatusEvent statusEvent = (WorkerStatusEvent) event;
-      worker.updateStatus(statusEvent);
-
-      return WorkerState.RUNNING;
-    }
-  }
-
-  private void updateStatus(WorkerStatusEvent statusEvent) {
-    this.writeLock.lock();
-
-    try {
-      lastHeartbeatTime = System.currentTimeMillis();
-      resource.setNumRunningTasks(statusEvent.getRunningTaskNum());
-      resource.setMaxHeap(statusEvent.maxHeap());
-      resource.setFreeHeap(statusEvent.getFreeHeap());
-      resource.setTotalHeap(statusEvent.getTotalHeap());
-    } finally {
-      this.writeLock.unlock();
-    }
-  }
-
-  public static class DeactivateNodeTransition implements 
SingleArcTransition<Worker, WorkerEvent> {
-    private final WorkerState finalState;
-
-    public DeactivateNodeTransition(WorkerState finalState) {
-      this.finalState = finalState;
-    }
-
-    @Override
-    public void transition(Worker worker, WorkerEvent workerEvent) {
-
-      worker.rmContext.getWorkers().remove(worker.getWorkerId());
-      LOG.info("Deactivating Node " + worker.getWorkerId() + " as it is now " 
+ finalState);
-      worker.rmContext.getInactiveWorkers().putIfAbsent(worker.getWorkerId(), 
worker);
-    }
-  }
-
-  public static class ReconnectNodeTransition implements 
SingleArcTransition<Worker, WorkerEvent> {
-
-    @Override
-    public void transition(Worker worker, WorkerEvent workerEvent) {
-      if (!(workerEvent instanceof WorkerReconnectEvent)) {
-        throw new IllegalArgumentException("workerEvent should be a 
WorkerReconnectEvent type.");
-      }
-      WorkerReconnectEvent castedEvent = (WorkerReconnectEvent) workerEvent;
-
-      Worker newWorker = castedEvent.getWorker();
-      worker.rmContext.getWorkers().put(castedEvent.getWorkerId(), newWorker);
-      worker.rmContext.getDispatcher().getEventHandler().handle(
-          new WorkerEvent(worker.getWorkerId(), WorkerEventType.STARTED));
-    }
-  }
-
-  @Override
-  public void handle(WorkerEvent event) {
-    LOG.debug("Processing " + event.getWorkerId() + " of type " + 
event.getType());
-    try {
-      writeLock.lock();
-      WorkerState oldState = getState();
-      try {
-        stateMachine.doTransition(event.getType(), event);
-      } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state"
-            + ", eventType:" + event.getType().name()
-            + ", oldState:" + oldState.name()
-            + ", nextState:" + getState().name()
-            , e);
-        LOG.error("Invalid event " + event.getType() + " on Worker  " + 
getWorkerId());
-      }
-      if (oldState != getState()) {
-        LOG.info(getWorkerId() + " Node Transitioned from " + oldState + " to 
" + getState());
-      }
-    }
-
-    finally {
-      writeLock.unlock();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java 
b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
deleted file mode 100644
index c208990..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.rm;
-
-import org.apache.hadoop.yarn.event.AbstractEvent;
-
-/**
- * WorkerEvent describes all kinds of events which sent to {@link Worker}.
- */
-public class WorkerEvent extends AbstractEvent<WorkerEventType> {
-  private final int workerId;
-
-  public WorkerEvent(int workerId, WorkerEventType workerEventType) {
-    super(workerEventType);
-    this.workerId = workerId;
-  }
-
-  public int getWorkerId() {
-    return workerId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEventType.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEventType.java 
b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEventType.java
deleted file mode 100644
index 0c97654..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEventType.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.rm;
-
-public enum WorkerEventType {
-
-  /** Source : {@link TajoResourceTracker}, Destination: {@link Worker} */
-  STARTED,
-  STATE_UPDATE,
-  RECONNECTED,
-
-  /** Source : {@link WorkerLivelinessMonitor}, Destination: {@link Worker} */
-  EXPIRE
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
deleted file mode 100644
index 2751886..0000000
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.rm;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tajo.conf.TajoConf;
-
-/**
- * It periodically checks the latest heartbeat time of {@link Worker}.
- * If the latest heartbeat time is expired, it produces EXPIRE event to a 
corresponding {@link Worker}.
- */
-public class WorkerLivelinessMonitor extends 
AbstractLivelinessMonitor<Integer> {
-
-  private EventHandler dispatcher;
-
-  public WorkerLivelinessMonitor(Dispatcher d) {
-    super(WorkerLivelinessMonitor.class.getSimpleName(), new SystemClock());
-    this.dispatcher = d.getEventHandler();
-  }
-
-  public void serviceInit(Configuration conf) throws Exception {
-    Preconditions.checkArgument(conf instanceof TajoConf);
-    TajoConf systemConf = (TajoConf) conf;
-    // milliseconds
-    int expireIntvl = 
systemConf.getIntVar(TajoConf.ConfVars.RESOURCE_TRACKER_HEARTBEAT_TIMEOUT);
-    setExpireInterval(expireIntvl);
-    setMonitorInterval(expireIntvl/3);
-    super.serviceInit(conf);
-  }
-
-  @Override
-  protected void expire(Integer id) {
-    dispatcher.handle(new WorkerEvent(id, WorkerEventType.EXPIRE));
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java 
b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
deleted file mode 100644
index 3828b6a..0000000
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.rm;
-
-/**
- * {@link TajoResourceTracker} produces this event, and it's destination is 
{@link Worker}.
- * This event occurs only when an inactive worker sends a ping again.
- */
-public class WorkerReconnectEvent extends WorkerEvent {
-  private final Worker worker;
-  public WorkerReconnectEvent(int workerId, Worker worker) {
-    super(workerId, WorkerEventType.RECONNECTED);
-    this.worker = worker;
-  }
-
-  public Worker getWorker() {
-    return worker;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java 
b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
deleted file mode 100644
index 3d5e062..0000000
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.rm;
-
-import com.google.protobuf.RpcCallback;
-import org.apache.hadoop.service.Service;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.QueryCoordinatorProtocol;
-import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
-import 
org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerResourceAllocationRequest;
-import org.apache.tajo.master.QueryInProgress;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * An interface of WorkerResourceManager which allows TajoMaster to request 
allocation for containers
- * and release the allocated containers.
- */
-public interface WorkerResourceManager extends Service {
-
-  /**
-   * Request a resource container for a QueryMaster.
-   *
-   * @param queryInProgress QueryInProgress
-   * @return A allocated container resource
-   */
-  @Deprecated
-  public QueryCoordinatorProtocol.WorkerAllocatedResource 
allocateQueryMaster(QueryInProgress queryInProgress);
-
-  /**
-   * Request one or more resource containers. You can set the number of 
containers and resource capabilities, such as
-   * memory, CPU cores, and disk slots. This is an asynchronous call. You 
should use a callback to get allocated
-   * resource containers. Each container is identified {@link 
org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto}.
-   *
-   * @param request Request description
-   * @param rpcCallBack Callback function
-   */
-  public void allocateWorkerResources(WorkerResourceAllocationRequest request,
-      RpcCallback<QueryCoordinatorProtocol.WorkerResourceAllocationResponse> 
rpcCallBack);
-
-  /**
-   * Release a container
-   *
-   * @param containerId ContainerIdProto to be released
-   */
-  public void releaseWorkerResource(ContainerProtocol.TajoContainerIdProto 
containerId);
-
-  public String getSeedQueryId() throws IOException;
-
-  /**
-   * Check if a query master is stopped.
-   *
-   * @param queryId QueryId to be checked
-   * @return True if QueryMaster is stopped
-   */
-  public boolean isQueryMasterStopped(QueryId queryId);
-
-  /**
-   * Stop a query master
-   *
-   * @param queryId QueryId to be stopped
-   */
-  public void releaseQueryMaster(QueryId queryId);
-
-  /**
-   *
-   * @return a Map instance containing active workers
-   */
-  public Map<Integer, Worker> getWorkers();
-
-  /**
-   *
-   * @return a Map instance containing inactive workers
-   */
-  public Map<Integer, Worker> getInactiveWorkers();
-
-  public void stop();
-
-  /**
-   *
-   * @return The overall summary of cluster resources
-   */
-  public ClusterResourceSummary getClusterResourceSummary();
-
-  /**
-   *
-   * @return WorkerIds on which QueryMasters are running
-   */
-  public Collection<Integer> getQueryMasters();
-
-  /**
-   *
-   * @return RMContext
-   */
-  public TajoRMContext getRMContext();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerState.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerState.java 
b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerState.java
deleted file mode 100644
index a941008..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerState.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.rm;
-
-/**
- * It presents the states of {@link Worker}.
- */
-public enum WorkerState {
-  /** New worker */
-  NEW,
-
-  /** Running worker */
-  RUNNING,
-
-  /** Worker is unhealthy */
-  UNHEALTHY,
-
-  /** worker is out of service */
-  DECOMMISSIONED,
-
-  /** worker has not sent a heartbeat for some configured time threshold */
-  LOST;
-
-  @SuppressWarnings("unused")
-  public boolean isUnusable() {
-    return (this == UNHEALTHY || this == DECOMMISSIONED || this == LOST);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java 
b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
deleted file mode 100644
index f1ab401..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.rm;
-
-/**
- * {@link TajoResourceTracker} produces this event, and its destination is
- * {@link org.apache.tajo.master.rm.Worker.StatusUpdateTransition} of {@link 
Worker}.
- */
-public class WorkerStatusEvent extends WorkerEvent {
-  private final int runningTaskNum;
-  private final long maxHeap;
-  private final long freeHeap;
-  private final long totalHeap;
-
-  public WorkerStatusEvent(int workerId, int runningTaskNum, long maxHeap, 
long freeHeap, long totalHeap) {
-    super(workerId, WorkerEventType.STATE_UPDATE);
-    this.runningTaskNum = runningTaskNum;
-    this.maxHeap = maxHeap;
-    this.freeHeap = freeHeap;
-    this.totalHeap = totalHeap;
-  }
-
-  public int getRunningTaskNum() {
-    return runningTaskNum;
-  }
-
-  public long maxHeap() {
-    return maxHeap;
-  }
-
-  public long getFreeHeap() {
-    return freeHeap;
-  }
-
-  public long getTotalHeap() {
-    return totalHeap;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/scheduler/AbstractQueryScheduler.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/AbstractQueryScheduler.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/AbstractQueryScheduler.java
new file mode 100644
index 0000000..1380417
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/AbstractQueryScheduler.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.scheduler;
+
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.resource.ResourceCalculator;
+
+/**
+ * please refer to {@TajoResourceScheduler} for detailed information.
+ */
+public abstract class AbstractQueryScheduler extends AbstractService 
implements TajoResourceScheduler {
+
+  protected final NodeResource clusterResource;
+  protected final NodeResource minResource;
+  protected final NodeResource maxResource;
+  protected final NodeResource qmMinResource;
+
+  public AbstractQueryScheduler(String name) {
+    super(name);
+    this.minResource = NodeResources.createResource(0);
+    this.qmMinResource = NodeResources.createResource(0);
+    this.maxResource = NodeResources.createResource(0);
+    this.clusterResource = NodeResources.createResource(0);
+  }
+
+  @Override
+  public NodeResource getClusterResource() {
+    return clusterResource;
+  }
+
+  @Override
+  public NodeResource getMinimumResourceCapability() {
+    return minResource;
+  }
+
+  @Override
+  public NodeResource getMaximumResourceCapability() {
+    return maxResource;
+  }
+
+  @Override
+  public NodeResource getQMMinimumResourceCapability() {
+    return qmMinResource;
+  }
+
+  public abstract int getRunningQuery();
+
+  public abstract ResourceCalculator getResourceCalculator();
+
+  public abstract void submitQuery(QuerySchedulingInfo schedulingInfo);
+
+  public abstract void stopQuery(QueryId queryId);
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QuerySchedulingInfo.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QuerySchedulingInfo.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QuerySchedulingInfo.java
index 3dd3389..a1fe743 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QuerySchedulingInfo.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QuerySchedulingInfo.java
@@ -20,13 +20,27 @@ package org.apache.tajo.master.scheduler;
 
 import com.google.common.base.Objects;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.util.NumberUtil;
 
-public class QuerySchedulingInfo {
+/**
+ * A QuerySchedulingInfo represents an scheduling information.
+ * It provides a common interface for queue and priority
+ */
+
+public class QuerySchedulingInfo implements Comparable<QuerySchedulingInfo> {
+  /** Name of queue */
+  private String queue;
+  /** Query owner */
+  private String user;
   private QueryId queryId;
-  private Integer priority;
-  private Long startTime;
+  /** Query priority for queries in same queue */
+  private int priority;
+  /** Start time for query in same queue */
+  private long startTime;
 
-  public QuerySchedulingInfo(QueryId queryId, Integer priority, Long 
startTime) {
+  public QuerySchedulingInfo(String queue, String user, QueryId queryId, int 
priority, long startTime) {
+    this.queue = queue;
+    this.user = user;
     this.queryId = queryId;
     this.priority = priority;
     this.startTime = startTime;
@@ -36,11 +50,15 @@ public class QuerySchedulingInfo {
     return queryId;
   }
 
-  public Integer getPriority() {
+  public String getUser() {
+    return user;
+  }
+
+  public int getPriority() {
     return priority;
   }
 
-  public Long getStartTime() {
+  public long getStartTime() {
     return startTime;
   }
 
@@ -48,8 +66,36 @@ public class QuerySchedulingInfo {
     return queryId.getId();
   }
 
+  public String getQueue() {
+    return queue;
+  }
+
+
+  @Override
+  public int compareTo(QuerySchedulingInfo o) {
+    int ret = NumberUtil.compare(priority, o.priority);
+    if(ret == 0) {
+      ret = NumberUtil.compare(startTime, o.startTime);
+    }
+    return ret;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    QuerySchedulingInfo other = (QuerySchedulingInfo) obj;
+    if (!this.getQueryId().equals(other.getQueryId()))
+      return false;
+    return true;
+  }
+
   @Override
   public int hashCode() {
-    return Objects.hashCode(startTime, getName(), priority);
+    return Objects.hashCode(queryId, queue, user, priority, startTime);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QueueInfo.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QueueInfo.java 
b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QueueInfo.java
new file mode 100644
index 0000000..acf793c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QueueInfo.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.scheduler;
+
+import java.lang.String;import java.util.List;
+
+/**
+ * QueueInfo is a report of the runtime information of the queue.
+ * <p>
+ * It includes information such as:
+ * <ul>
+ *   <li>Queue name.</li>
+ *   <li>Capacity of the queue.</li>
+ *   <li>Maximum capacity of the queue.</li>
+ *   <li>Current capacity of the queue.</li>
+ *   <li>Child queues.</li>
+ *   <li>Running applications.</li>
+ *   <li>{@link QueueState} of the queue.</li>
+ * </ul>
+ *
+ */
+
+public abstract class QueueInfo {
+  /**
+   * Get the <em>name</em> of the queue.
+   * @return <em>name</em> of the queue
+   */
+  public abstract String getQueueName();
+
+  public abstract void setQueueName(String queueName);
+
+  /**
+   * Get the <em>configured capacity</em> of the queue.
+   * @return <em>configured capacity</em> of the queue
+   */
+  public abstract float getCapacity();
+
+  public abstract void setCapacity(float capacity);
+  
+  /**
+   * Get the <em>maximum capacity</em> of the queue.
+   * @return <em>maximum capacity</em> of the queue
+   */
+
+  public abstract float getMaximumCapacity();
+
+  public abstract void setMaximumCapacity(float maximumCapacity);
+
+  /**
+   * Get the <em>maximum query capacity</em> of the queue.
+   * @return <em>maximum query capacity</em> of the queue
+   */
+
+  public abstract float getMaximumQueryCapacity();
+
+  public abstract void setMaximumQueryCapacity(float maximumQueryCapacity);
+
+  /**
+   * Get the <em>current capacity</em> of the queue.
+   * @return <em>current capacity</em> of the queue
+   */
+
+  public abstract float getCurrentCapacity();
+
+  public abstract void setCurrentCapacity(float currentCapacity);
+  
+  /**
+   * Get the <em>child queues</em> of the queue.
+   * @return <em>child queues</em> of the queue
+   */
+
+  public abstract List<QueueInfo> getChildQueues();
+
+  public abstract void setChildQueues(List<QueueInfo> childQueues);
+
+  
+  /**
+   * Get the <code>QueueState</code> of the queue.
+   * @return <code>QueueState</code> of the queue
+   */
+  public abstract QueueState getQueueState();
+
+  public abstract void setQueueState(QueueState queueState);
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QueueState.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QueueState.java 
b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QueueState.java
new file mode 100644
index 0000000..d7acbfe
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/QueueState.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.scheduler;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+
+/**
+ * State of a Queue.
+ * <p>
+ * A queue is in one of:
+ * <ul>
+ *   <li>{@link #RUNNING} - normal state.</li>
+ *   <li>{@link #STOPPED} - not accepting new application submissions.</li>
+ * </ul>
+ * 
+ * @see QueueInfo
+ * @see 
ApplicationClientProtocol#getQueueInfo(org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest)
+ */
+@Public
+@Stable
+public enum QueueState {
+  /**
+   * Stopped - Not accepting submissions of new applications.
+   */
+  STOPPED,
+  
+  /**
+   * Running - normal operation.
+   */
+  RUNNING
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SchedulingAlgorithms.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SchedulingAlgorithms.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SchedulingAlgorithms.java
index 7fd07b5..efe4561 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SchedulingAlgorithms.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SchedulingAlgorithms.java
@@ -18,6 +18,8 @@
 
 package org.apache.tajo.master.scheduler;
 
+import org.apache.tajo.util.NumberUtil;
+
 import java.util.Comparator;
 
 /**
@@ -32,9 +34,9 @@ public class SchedulingAlgorithms  {
   public static class FifoComparator implements 
Comparator<QuerySchedulingInfo> {
     @Override
     public int compare(QuerySchedulingInfo q1, QuerySchedulingInfo q2) {
-      int res = q1.getPriority().compareTo(q2.getPriority());
+      int res = NumberUtil.compare(q1.getPriority(), q2.getPriority());
       if (res == 0) {
-        res = (int) Math.signum(q1.getStartTime() - q2.getStartTime());
+        res = NumberUtil.compare(q1.getStartTime(), q2.getStartTime());
       }
       if (res == 0) {
         // In the rare case where jobs were submitted at the exact same time,

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
deleted file mode 100644
index 6cb98eb..0000000
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.scheduler;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.master.QueryInProgress;
-import org.apache.tajo.master.QueryManager;
-
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class SimpleFifoScheduler implements Scheduler {
-  private static final Log LOG = 
LogFactory.getLog(SimpleFifoScheduler.class.getName());
-  private LinkedList<QuerySchedulingInfo> pool = new 
LinkedList<QuerySchedulingInfo>();
-  private final Thread queryProcessor;
-  private AtomicBoolean stopped = new AtomicBoolean();
-  private QueryManager manager;
-  private Comparator<QuerySchedulingInfo> COMPARATOR = new 
SchedulingAlgorithms.FifoComparator();
-
-  public SimpleFifoScheduler(QueryManager manager) {
-    this.manager = manager;
-    this.queryProcessor = new Thread(new QueryProcessor());
-    this.queryProcessor.setName("Query Processor");
-  }
-
-  @Override
-  public Mode getMode() {
-    return Mode.FIFO;
-  }
-
-  @Override
-  public String getName() {
-    return manager.getName();
-  }
-
-  @Override
-  public boolean addQuery(QueryInProgress queryInProgress) {
-    int qSize = pool.size();
-    if (qSize != 0 && qSize % 100 == 0) {
-      LOG.info("Size of Fifo queue is " + qSize);
-    }
-
-    QuerySchedulingInfo querySchedulingInfo = new 
QuerySchedulingInfo(queryInProgress.getQueryId(), 1,
-        queryInProgress.getQueryInfo().getStartTime());
-    boolean result = pool.add(querySchedulingInfo);
-    if (getRunningQueries().size() == 0) wakeupProcessor();
-    return result;
-  }
-
-  @Override
-  public boolean removeQuery(QueryId queryId) {
-    return pool.remove(getQueryByQueryId(queryId));
-  }
-
-  public QuerySchedulingInfo getQueryByQueryId(QueryId queryId) {
-    for (QuerySchedulingInfo querySchedulingInfo : pool) {
-      if (querySchedulingInfo.getQueryId().equals(queryId)) {
-        return querySchedulingInfo;
-      }
-    }
-    return null;
-  }
-
-  @Override
-  public List<QueryInProgress> getRunningQueries() {
-    return new ArrayList<QueryInProgress>(manager.getRunningQueries());
-  }
-
-  public void start() {
-    queryProcessor.start();
-  }
-
-  public void stop() {
-    if (stopped.getAndSet(true)) {
-      return;
-    }
-    pool.clear();
-    synchronized (queryProcessor) {
-      queryProcessor.interrupt();
-    }
-  }
-
-  private QuerySchedulingInfo pollScheduledQuery() {
-    if (pool.size() > 1) {
-      Collections.sort(pool, COMPARATOR);
-    }
-    return pool.poll();
-  }
-
-  private void wakeupProcessor() {
-    synchronized (queryProcessor) {
-      queryProcessor.notifyAll();
-    }
-  }
-
-  private final class QueryProcessor implements Runnable {
-    @Override
-    public void run() {
-
-      QuerySchedulingInfo query;
-
-      while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
-        query = null;
-        if (getRunningQueries().size() == 0) {
-          query = pollScheduledQuery();
-        }
-
-        if (query != null) {
-          try {
-            manager.startQueryJob(query.getQueryId());
-          } catch (Throwable t) {
-            LOG.fatal("Exception during query startup:", t);
-            manager.stopQuery(query.getQueryId());
-          }
-        }
-
-        synchronized (queryProcessor) {
-          try {
-            queryProcessor.wait(500);
-          } catch (InterruptedException e) {
-            if (stopped.get()) {
-              break;
-            }
-            LOG.warn("Exception during shutdown: ", e);
-          }
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleScheduler.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleScheduler.java 
b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleScheduler.java
new file mode 100644
index 0000000..e41ac95
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleScheduler.java
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.scheduler;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.QueryInfo;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.rm.TajoRMContext;
+import org.apache.tajo.master.rm.NodeStatus;
+import org.apache.tajo.master.scheduler.event.ResourceReserveSchedulerEvent;
+import org.apache.tajo.master.scheduler.event.SchedulerEvent;
+import org.apache.tajo.resource.DefaultResourceCalculator;
+import org.apache.tajo.resource.NodeResource;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.resource.ResourceCalculator;
+import org.apache.tajo.util.TUtil;
+
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import static org.apache.tajo.ResourceProtos.*;
+
+/**
+ * SimpleScheduler can execute query and stages simultaneously.
+ * Each query and the stage competes to get the resource
+ */
+public class SimpleScheduler extends AbstractQueryScheduler {
+
+  private static final Log LOG = LogFactory.getLog(SimpleScheduler.class);
+  private static final float MAXIMUM_RUNNING_QM_RATE = 0.5f;
+  private static final Comparator<QuerySchedulingInfo> COMPARATOR = new 
SchedulingAlgorithms.FifoComparator();
+
+  private volatile boolean isStopped = false;
+  private final TajoMaster.MasterContext masterContext;
+
+  private final TajoRMContext rmContext;
+  private final BlockingQueue<QuerySchedulingInfo> queryQueue;
+  private final Map<QueryId, QuerySchedulingInfo> pendingQueryMap = 
Maps.newHashMap();
+
+  private final Map<QueryId, Integer> assignedQueryMasterMap = 
Maps.newHashMap();
+  private final ResourceCalculator resourceCalculator = new 
DefaultResourceCalculator();
+
+  private final Thread queryProcessor;
+  private TajoConf tajoConf;
+
+  @VisibleForTesting
+  public SimpleScheduler(TajoMaster.MasterContext context, TajoRMContext 
rmContext) {
+    super(SimpleScheduler.class.getName());
+    this.masterContext = context;
+    this.rmContext = rmContext;
+    //Copy default array capacity from PriorityBlockingQueue.
+    this.queryQueue = new PriorityBlockingQueue<QuerySchedulingInfo>(11, 
COMPARATOR);
+    this.queryProcessor = new Thread(new QueryProcessor());
+  }
+
+  public SimpleScheduler(TajoMaster.MasterContext context) {
+    this(context, context.getResourceManager().getRMContext());
+  }
+
+  private void initScheduler(TajoConf conf) {
+    
this.minResource.setMemory(conf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY)).setVirtualCores(1);
+    
this.qmMinResource.setMemory(conf.getIntVar(TajoConf.ConfVars.QUERYMASTER_MINIMUM_MEMORY)).setVirtualCores(1);
+    updateResource();
+    this.queryProcessor.setName("Query Processor");
+  }
+
+  private void updateResource() {
+    NodeResource resource = NodeResources.createResource(0);
+    NodeResource totalResource = NodeResources.createResource(0);
+    for (NodeStatus nodeStatus : getRMContext().getNodes().values()) {
+      NodeResources.addTo(resource, nodeStatus.getAvailableResource());
+      NodeResources.addTo(totalResource, 
nodeStatus.getTotalResourceCapability());
+
+    }
+
+    NodeResources.update(maxResource, totalResource);
+    NodeResources.update(clusterResource, resource);
+
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Cluster Resource. available : " + getClusterResource()
+          + " maximum: " + getMaximumResourceCapability());
+    }
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    this.tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
+    initScheduler(tajoConf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    this.queryProcessor.start();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    this.isStopped = true;
+    super.serviceStop();
+  }
+
+  @Override
+  public int getRunningQuery() {
+    return assignedQueryMasterMap.size();
+  }
+
+  @Override
+  public ResourceCalculator getResourceCalculator() {
+    return resourceCalculator;
+  }
+
+  private NodeResourceRequest createQMResourceRequest(QueryInfo queryInfo) {
+    NodeResource qmResource = getQMMinimumResourceCapability();
+
+    int containers = 1;
+    Set<Integer> assignedQMNodes = 
Sets.newHashSet(assignedQueryMasterMap.values());
+    List<Integer> idleNode = Lists.newArrayList();
+
+    for (NodeStatus nodeStatus : getRMContext().getNodes().values()) {
+
+      //find idle node for QM
+      if (!assignedQMNodes.contains(nodeStatus.getWorkerId())) {
+        idleNode.add(nodeStatus.getWorkerId());
+      }
+
+      if (idleNode.size() > containers * 3) break;
+    }
+
+    NodeResourceRequest.Builder builder = NodeResourceRequest.newBuilder();
+
+    builder.setQueryId(queryInfo.getQueryId().getProto())
+        .setCapacity(qmResource.getProto())
+        .setType(ResourceType.QUERYMASTER)
+        .setPriority(1)
+        .setNumContainers(containers)
+        .setRunningTasks(1)
+        .addAllCandidateNodes(idleNode)
+        .setUserId(queryInfo.getQueryContext().getUser());
+    //TODO .setQueue(queryInfo.getQueue());
+    return builder.build();
+  }
+
+
+  @Override
+  public int getNumClusterNodes() {
+    return rmContext.getNodes().size();
+  }
+
+  @Override
+  public List<AllocationResourceProto>
+  reserve(QueryId queryId, NodeResourceRequest request) {
+
+    List<AllocationResourceProto> reservedResources;
+    NodeResource capacity = new NodeResource(request.getCapacity());
+    if (!NodeResources.fitsIn(capacity, getClusterResource())) {
+      return Lists.newArrayList();
+    }
+
+    LinkedList<Integer> workers = new LinkedList<Integer>();
+
+    if (request.getCandidateNodesCount() > 0) {
+      workers.addAll(request.getCandidateNodesList());
+      Collections.shuffle(workers);
+    }
+
+    int requiredContainers = request.getNumContainers();
+    // reserve resource from candidate workers for locality
+    reservedResources = reserveClusterResource(workers, capacity, 
requiredContainers);
+
+    // reserve resource in random workers
+    if (reservedResources.size() < requiredContainers) {
+      LinkedList<Integer> randomNodes = new 
LinkedList<Integer>(getRMContext().getNodes().keySet());
+      Collections.shuffle(randomNodes);
+
+      reservedResources.addAll(reserveClusterResource(
+          randomNodes, capacity, requiredContainers - 
reservedResources.size()));
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Request: " + request.getCapacity() + ", containerNum:" + 
request.getNumContainers()
+          + "Current cluster resource: " + getClusterResource());
+    }
+    return reservedResources;
+  }
+
+  private List<AllocationResourceProto> reserveClusterResource(List<Integer> 
workers,
+                                                               NodeResource 
capacity, int requiredNum) {
+
+    List<AllocationResourceProto> reservedResources = Lists.newArrayList();
+    AllocationResourceProto.Builder resourceBuilder = 
AllocationResourceProto.newBuilder();
+    int allocatedResources = 0;
+
+    while (workers.size() > 0) {
+      Iterator<Integer> iter = workers.iterator();
+      while (iter.hasNext()) {
+
+        int workerId = iter.next();
+        NodeStatus nodeStatus = getRMContext().getNodes().get(workerId);
+        if (nodeStatus == null) {
+          iter.remove();
+          LOG.warn("Can't find the node. id :" + workerId);
+          continue;
+        } else {
+          if (NodeResources.fitsIn(capacity, 
nodeStatus.getAvailableResource())) {
+            NodeResources.subtractFrom(getClusterResource(), capacity);
+            NodeResources.subtractFrom(nodeStatus.getAvailableResource(), 
capacity);
+            allocatedResources++;
+            resourceBuilder.setResource(capacity.getProto());
+            resourceBuilder.setWorkerId(workerId);
+            reservedResources.add(resourceBuilder.build());
+          } else {
+            // remove unavailable nodeStatus;
+            iter.remove();
+          }
+        }
+
+        if (allocatedResources >= requiredNum) {
+          return reservedResources;
+        }
+      }
+    }
+    return reservedResources;
+  }
+
+
+  @Override
+  public void handle(SchedulerEvent event) {
+    switch (event.getType()) {
+      case RESOURCE_RESERVE:
+        //TODO should consider request priority
+        reserveResource(TUtil.checkTypeAndGet(event, 
ResourceReserveSchedulerEvent.class));
+        break;
+      case RESOURCE_UPDATE:
+        updateResource();
+        break;
+      default:
+        break;
+
+    }
+  }
+
+  /**
+   * This is an asynchronous call. You should use a callback to get reserved 
resource containers.
+   */
+  protected void reserveResource(ResourceReserveSchedulerEvent schedulerEvent) 
{
+    List<AllocationResourceProto> resources =
+        reserve(new QueryId(schedulerEvent.getRequest().getQueryId()), 
schedulerEvent.getRequest());
+
+    NodeResourceResponse.Builder response = NodeResourceResponse.newBuilder();
+    response.setQueryId(schedulerEvent.getRequest().getQueryId());
+    
schedulerEvent.getCallBack().run(response.addAllResource(resources).build());
+  }
+
+  /**
+   * Submit a query to scheduler
+   */
+  public void submitQuery(QuerySchedulingInfo schedulingInfo) {
+    queryQueue.add(schedulingInfo);
+    pendingQueryMap.put(schedulingInfo.getQueryId(), schedulingInfo);
+  }
+
+  protected boolean startQuery(QueryId queryId, AllocationResourceProto 
allocation) {
+   return masterContext.getQueryJobManager().startQueryJob(queryId, 
allocation);
+  }
+
+  public void stopQuery(QueryId queryId) {
+    if(pendingQueryMap.containsKey(queryId)){
+      queryQueue.remove(pendingQueryMap.remove(queryId));
+    }
+    assignedQueryMasterMap.remove(queryId);
+  }
+
+  public BlockingQueue<QuerySchedulingInfo> getQueryQueue() {
+    return queryQueue;
+  }
+
+  private NodeStatus getWorker(int workerId) {
+    return rmContext.getNodes().get(workerId);
+  }
+
+  protected TajoRMContext getRMContext() {
+    return rmContext;
+  }
+
+  public WorkerConnectionInfo getQueryMaster(QueryId queryId) {
+    if (assignedQueryMasterMap.containsKey(queryId)) {
+      return 
rmContext.getNodes().get(assignedQueryMasterMap.get(queryId)).getConnectionInfo();
+    }
+    return null;
+  }
+
+  protected QueryInfo getQueryInfo(QueryId queryId) {
+    return 
masterContext.getQueryJobManager().getQueryInProgress(queryId).getQueryInfo();
+  }
+
+  private final class QueryProcessor implements Runnable {
+    @Override
+    public void run() {
+
+      QuerySchedulingInfo query;
+
+      while (!isStopped && !Thread.currentThread().isInterrupted()) {
+        try {
+          query = queryQueue.take();
+        } catch (InterruptedException e) {
+          LOG.warn(e.getMessage(), e);
+          break;
+        }
+        //TODO get by assigned queue
+        int maxAvailable = getResourceCalculator().computeAvailableContainers(
+            getMaximumResourceCapability(), getQMMinimumResourceCapability());
+
+        // check maximum parallel running QM. allow 50% parallel running
+        if (assignedQueryMasterMap.size() >= Math.floor(maxAvailable * 
MAXIMUM_RUNNING_QM_RATE)) {
+          queryQueue.add(query);
+          synchronized (this) {
+            try {
+              this.wait(1000);
+            } catch (InterruptedException e) {
+              if(!isStopped) {
+                LOG.fatal(e.getMessage(), e);
+                return;
+              }
+            }
+          }
+        } else {
+          QueryInfo queryInfo = getQueryInfo(query.getQueryId());
+          List<AllocationResourceProto> allocation = 
reserve(query.getQueryId(), createQMResourceRequest(queryInfo));
+
+          if(allocation.size() == 0) {
+            queryQueue.add(query);
+            LOG.info("No Available Resources for QueryMaster :" + 
queryInfo.getQueryId() + "," + queryInfo);
+
+            synchronized (this) {
+              try {
+                this.wait(100);
+              } catch (InterruptedException e) {
+                LOG.fatal(e);
+              }
+            }
+          } else {
+            try {
+              //if QM resource can't be allocated to a node, it should retry
+              boolean started = startQuery(query.getQueryId(), 
allocation.get(0));
+              if(!started) {
+                queryQueue.put(query);
+              } else {
+                assignedQueryMasterMap.put(query.getQueryId(), 
allocation.get(0).getWorkerId());
+              }
+            } catch (Throwable t) {
+              LOG.fatal("Exception during query startup:", t);
+              masterContext.getQueryJobManager().stopQuery(query.getQueryId());
+            }
+          }
+        }
+        LOG.info("Running Queries: " + assignedQueryMasterMap.size());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/scheduler/TajoResourceScheduler.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/TajoResourceScheduler.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/TajoResourceScheduler.java
new file mode 100644
index 0000000..c7c37c4
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/TajoResourceScheduler.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.scheduler;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.ResourceProtos.NodeResourceRequest;
+import org.apache.tajo.ResourceProtos.AllocationResourceProto;
+import org.apache.tajo.master.scheduler.event.SchedulerEvent;
+import org.apache.tajo.resource.NodeResource;
+
+import java.util.List;
+
+/**
+ * This interface is used by scheduler for allocating of resources.
+ */
+public interface TajoResourceScheduler extends EventHandler<SchedulerEvent> {
+
+  /**
+   * Get the whole resource capacity of the cluster.
+   * @return the whole resource capacity of the cluster.
+   */
+
+  NodeResource getClusterResource();
+
+  /**
+   * Get minimum allocatable {@link NodeResource}.
+   * @return minimum allocatable resource
+   */
+  NodeResource getMinimumResourceCapability();
+
+  /**
+   * Get minimum allocatable {@link NodeResource} of QueryMaster.
+   * @return minimum allocatable resource
+   */
+  NodeResource getQMMinimumResourceCapability();
+
+  /**
+   * Get maximum allocatable {@link NodeResource}.
+   * @return maximum allocatable resource
+   */
+  NodeResource getMaximumResourceCapability();
+
+  /**
+   * Get the number of nodes available in the cluster.
+   * @return the number of available nodes.
+   */
+  int getNumClusterNodes();
+
+  /**
+   * Get reservation resource. The cluster resource is updated by 
TajoResourceTracker
+   * Request one or more resource containers. You can set the number of 
containers and resource capabilities,
+   * such as memory, CPU cores, and disk slots.
+   * @return the number of reserved resources.
+   */
+  List<AllocationResourceProto>
+  reserve(QueryId queryId, NodeResourceRequest ask);
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/ResourceReserveSchedulerEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/ResourceReserveSchedulerEvent.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/ResourceReserveSchedulerEvent.java
new file mode 100644
index 0000000..47ee53b
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/ResourceReserveSchedulerEvent.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.scheduler.event;
+
+import com.google.protobuf.RpcCallback;
+import static org.apache.tajo.ResourceProtos.NodeResourceRequest;
+import static org.apache.tajo.ResourceProtos.NodeResourceResponse;
+
+public class ResourceReserveSchedulerEvent extends SchedulerEvent {
+
+  private NodeResourceRequest request;
+
+  private RpcCallback<NodeResourceResponse> callBack;
+
+  public ResourceReserveSchedulerEvent(NodeResourceRequest request,
+                                       RpcCallback<NodeResourceResponse> 
callback) {
+    super(SchedulerEventType.RESOURCE_RESERVE);
+    this.request = request;
+    this.callBack = callback;
+  }
+
+  public NodeResourceRequest getRequest() {
+    return request;
+  }
+
+  public RpcCallback<NodeResourceResponse> getCallBack() {
+    return callBack;
+  }
+}

Reply via email to