http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
new file mode 100644
index 0000000..76df397
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -0,0 +1,631 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tajo.*;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.global.GlobalPlanner;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ha.HAServiceUtil;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.event.QueryStartEvent;
+import org.apache.tajo.master.event.QueryStopEvent;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.history.QueryHistory;
+import org.apache.tajo.worker.TajoWorker;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeat;
+import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse;
+
+// TODO - when exception, send error status to QueryJobManager
+public class QueryMaster extends CompositeService implements EventHandler {
+  private static final Log LOG = 
LogFactory.getLog(QueryMaster.class.getName());
+
+  private int querySessionTimeout;
+
+  private Clock clock;
+
+  private AsyncDispatcher dispatcher;
+
+  private GlobalPlanner globalPlanner;
+
+  private TajoConf systemConf;
+
+  private Map<QueryId, QueryMasterTask> queryMasterTasks = 
Maps.newConcurrentMap();
+
+  private Map<QueryId, QueryMasterTask> finishedQueryMasterTasks = 
Maps.newConcurrentMap();
+
+  private ClientSessionTimeoutCheckThread clientSessionTimeoutCheckThread;
+
+  private AtomicBoolean queryMasterStop = new AtomicBoolean(false);
+
+  private QueryMasterContext queryMasterContext;
+
+  private QueryContext queryContext;
+
+  private QueryHeartbeatThread queryHeartbeatThread;
+
+  private FinishedQueryMasterTaskCleanThread 
finishedQueryMasterTaskCleanThread;
+
+  private TajoWorker.WorkerContext workerContext;
+
+  private RpcConnectionPool connPool;
+
+  private ExecutorService eventExecutor;
+
+  public QueryMaster(TajoWorker.WorkerContext workerContext) {
+    super(QueryMaster.class.getName());
+    this.workerContext = workerContext;
+  }
+
+  public void init(Configuration conf) {
+    LOG.info("QueryMaster init");
+    try {
+      this.systemConf = (TajoConf)conf;
+      this.connPool = RpcConnectionPool.getPool(systemConf);
+
+      querySessionTimeout = 
systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
+      queryMasterContext = new QueryMasterContext(systemConf);
+
+      clock = new SystemClock();
+
+      this.dispatcher = new AsyncDispatcher();
+      addIfService(dispatcher);
+
+      globalPlanner = new GlobalPlanner(systemConf, workerContext);
+
+      dispatcher.register(QueryStartEvent.EventType.class, new 
QueryStartEventHandler());
+      dispatcher.register(QueryStopEvent.EventType.class, new 
QueryStopEventHandler());
+
+    } catch (Throwable t) {
+      LOG.error(t.getMessage(), t);
+      throw new RuntimeException(t);
+    }
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    LOG.info("QueryMaster start");
+
+    queryHeartbeatThread = new QueryHeartbeatThread();
+    queryHeartbeatThread.start();
+
+    clientSessionTimeoutCheckThread = new ClientSessionTimeoutCheckThread();
+    clientSessionTimeoutCheckThread.start();
+
+    finishedQueryMasterTaskCleanThread = new 
FinishedQueryMasterTaskCleanThread();
+    finishedQueryMasterTaskCleanThread.start();
+
+    eventExecutor = Executors.newSingleThreadExecutor();
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    if(queryMasterStop.getAndSet(true)){
+      return;
+    }
+
+    if(queryHeartbeatThread != null) {
+      queryHeartbeatThread.interrupt();
+    }
+
+    if(clientSessionTimeoutCheckThread != null) {
+      clientSessionTimeoutCheckThread.interrupt();
+    }
+
+    if(finishedQueryMasterTaskCleanThread != null) {
+      finishedQueryMasterTaskCleanThread.interrupt();
+    }
+
+    if(eventExecutor != null){
+      eventExecutor.shutdown();
+    }
+
+    super.stop();
+
+    LOG.info("QueryMaster stop");
+    if(queryMasterContext.getWorkerContext().isYarnContainerMode()) {
+      queryMasterContext.getWorkerContext().stopWorker(true);
+    }
+  }
+
+  protected void 
cleanupExecutionBlock(List<TajoIdProtos.ExecutionBlockIdProto> 
executionBlockIds) {
+    StringBuilder cleanupMessage = new StringBuilder();
+    String prefix = "";
+    for (TajoIdProtos.ExecutionBlockIdProto eachEbId: executionBlockIds) {
+      cleanupMessage.append(prefix).append(new 
ExecutionBlockId(eachEbId).toString());
+      prefix = ",";
+    }
+    LOG.info("cleanup executionBlocks: " + cleanupMessage);
+    NettyClientBase rpc = null;
+    List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
+    TajoWorkerProtocol.ExecutionBlockListProto.Builder builder = 
TajoWorkerProtocol.ExecutionBlockListProto.newBuilder();
+    builder.addAllExecutionBlockId(Lists.newArrayList(executionBlockIds));
+    TajoWorkerProtocol.ExecutionBlockListProto executionBlockListProto = 
builder.build();
+
+    for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+      try {
+        TajoProtos.WorkerConnectionInfoProto connectionInfo = 
worker.getConnectionInfo();
+        rpc = 
connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), 
connectionInfo.getPeerRpcPort()),
+            TajoWorkerProtocol.class, true);
+        TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService 
= rpc.getStub();
+
+        tajoWorkerProtocolService.cleanupExecutionBlocks(null, 
executionBlockListProto, NullCallback.get());
+      } catch (Exception e) {
+        continue;
+      } finally {
+        connPool.releaseConnection(rpc);
+      }
+    }
+  }
+
+  private void cleanup(QueryId queryId) {
+    LOG.info("cleanup query resources : " + queryId);
+    NettyClientBase rpc = null;
+    List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
+
+    for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+      try {
+        TajoProtos.WorkerConnectionInfoProto connectionInfo = 
worker.getConnectionInfo();
+        rpc = 
connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), 
connectionInfo.getPeerRpcPort()),
+            TajoWorkerProtocol.class, true);
+        TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService 
= rpc.getStub();
+
+        tajoWorkerProtocolService.cleanup(null, queryId.getProto(), 
NullCallback.get());
+      } catch (Exception e) {
+        LOG.error(e.getMessage());
+      } finally {
+        connPool.releaseConnection(rpc);
+      }
+    }
+  }
+
+  public List<TajoMasterProtocol.WorkerResourceProto> getAllWorker() {
+
+    NettyClientBase rpc = null;
+    try {
+      // In TajoMaster HA mode, if backup master be active status,
+      // worker may fail to connect existing active master. Thus,
+      // if worker can't connect the master, worker should try to connect 
another master and
+      // update master address in worker context.
+      if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+        try {
+          rpc = 
connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+              TajoMasterProtocol.class, true);
+        } catch (Exception e) {
+          queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
+              HAServiceUtil.getResourceTrackerAddress(systemConf));
+          queryMasterContext.getWorkerContext().setTajoMasterAddress(
+              HAServiceUtil.getMasterUmbilicalAddress(systemConf));
+          rpc = 
connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+              TajoMasterProtocol.class, true);
+        }
+      } else {
+        rpc = 
connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+            TajoMasterProtocol.class, true);
+      }
+
+      TajoMasterProtocol.TajoMasterProtocolService masterService = 
rpc.getStub();
+
+      CallFuture<TajoMasterProtocol.WorkerResourcesRequest> callBack =
+          new CallFuture<TajoMasterProtocol.WorkerResourcesRequest>();
+      masterService.getAllWorkerResource(callBack.getController(),
+          PrimitiveProtos.NullProto.getDefaultInstance(), callBack);
+
+      TajoMasterProtocol.WorkerResourcesRequest workerResourcesRequest = 
callBack.get(2, TimeUnit.SECONDS);
+      return workerResourcesRequest.getWorkerResourcesList();
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    } finally {
+      connPool.releaseConnection(rpc);
+    }
+    return new ArrayList<TajoMasterProtocol.WorkerResourceProto>();
+  }
+
+  public void reportQueryStatusToQueryMaster(QueryId queryId, 
TajoProtos.QueryState state) {
+    LOG.info("Send QueryMaster Ready to QueryJobManager:" + queryId);
+    NettyClientBase tmClient = null;
+    try {
+      // In TajoMaster HA mode, if backup master be active status,
+      // worker may fail to connect existing active master. Thus,
+      // if worker can't connect the master, worker should try to connect 
another master and
+      // update master address in worker context.
+      if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+        try {
+          tmClient = 
connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+              TajoMasterProtocol.class, true);
+        } catch (Exception e) {
+          queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
+              HAServiceUtil.getResourceTrackerAddress(systemConf));
+          queryMasterContext.getWorkerContext().setTajoMasterAddress(
+              HAServiceUtil.getMasterUmbilicalAddress(systemConf));
+          tmClient = 
connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+              TajoMasterProtocol.class, true);
+        }
+      } else {
+        tmClient = 
connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+            TajoMasterProtocol.class, true);
+      }
+
+      TajoMasterProtocol.TajoMasterProtocolService masterClientService = 
tmClient.getStub();
+
+      TajoHeartbeat.Builder queryHeartbeatBuilder = TajoHeartbeat.newBuilder()
+          .setConnectionInfo(workerContext.getConnectionInfo().getProto())
+          .setState(state)
+          .setQueryId(queryId.getProto());
+
+      CallFuture<TajoHeartbeatResponse> callBack =
+          new CallFuture<TajoHeartbeatResponse>();
+
+      masterClientService.heartbeat(callBack.getController(), 
queryHeartbeatBuilder.build(), callBack);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    } finally {
+      connPool.releaseConnection(tmClient);
+    }
+  }
+
+  @Override
+  public void handle(Event event) {
+    dispatcher.getEventHandler().handle(event);
+  }
+
+  public Query getQuery(QueryId queryId) {
+    return queryMasterTasks.get(queryId).getQuery();
+  }
+
+  public QueryMasterTask getQueryMasterTask(QueryId queryId) {
+    return queryMasterTasks.get(queryId);
+  }
+
+  public QueryMasterTask getQueryMasterTask(QueryId queryId, boolean 
includeFinished) {
+    QueryMasterTask queryMasterTask =  queryMasterTasks.get(queryId);
+    if(queryMasterTask != null) {
+      return queryMasterTask;
+    } else {
+      if(includeFinished) {
+        return finishedQueryMasterTasks.get(queryId);
+      } else {
+        return null;
+      }
+    }
+  }
+
+  public QueryMasterContext getContext() {
+    return this.queryMasterContext;
+  }
+
+  public Collection<QueryMasterTask> getQueryMasterTasks() {
+    return queryMasterTasks.values();
+  }
+
+  public Collection<QueryMasterTask> getFinishedQueryMasterTasks() {
+    return finishedQueryMasterTasks.values();
+  }
+
+  public class QueryMasterContext {
+    private TajoConf conf;
+
+    public QueryMasterContext(TajoConf conf) {
+      this.conf = conf;
+    }
+
+    public TajoConf getConf() {
+      return conf;
+    }
+
+    public ExecutorService getEventExecutor(){
+      return eventExecutor;
+    }
+
+    public AsyncDispatcher getDispatcher() {
+      return dispatcher;
+    }
+
+    public Clock getClock() {
+      return clock;
+    }
+
+    public QueryMaster getQueryMaster() {
+      return QueryMaster.this;
+    }
+
+    public GlobalPlanner getGlobalPlanner() {
+      return globalPlanner;
+    }
+
+    public TajoWorker.WorkerContext getWorkerContext() {
+      return workerContext;
+    }
+
+    public EventHandler getEventHandler() {
+      return dispatcher.getEventHandler();
+    }
+
+    public void stopQuery(QueryId queryId) {
+      QueryMasterTask queryMasterTask = queryMasterTasks.remove(queryId);
+      if(queryMasterTask == null) {
+        LOG.warn("No query info:" + queryId);
+        return;
+      }
+
+      finishedQueryMasterTasks.put(queryId, queryMasterTask);
+
+      TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(queryMasterTask);
+      CallFuture<TajoHeartbeatResponse> future = new 
CallFuture<TajoHeartbeatResponse>();
+
+      NettyClientBase tmClient = null;
+      try {
+        // In TajoMaster HA mode, if backup master be active status,
+        // worker may fail to connect existing active master. Thus,
+        // if worker can't connect the master, worker should try to connect 
another master and
+        // update master address in worker context.
+        if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+          try {
+            tmClient = 
connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+                TajoMasterProtocol.class, true);
+          } catch (Exception e) {
+            
queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
+            
queryMasterContext.getWorkerContext().setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
+            tmClient = 
connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+                TajoMasterProtocol.class, true);
+          }
+        } else {
+          tmClient = 
connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+              TajoMasterProtocol.class, true);
+        }
+
+        TajoMasterProtocol.TajoMasterProtocolService masterClientService = 
tmClient.getStub();
+        masterClientService.heartbeat(future.getController(), queryHeartbeat, 
future);
+      }  catch (Exception e) {
+        //this function will be closed in new thread.
+        //When tajo do stop cluster, tajo master maybe throw closed connection 
exception
+
+        LOG.error(e.getMessage(), e);
+      } finally {
+        connPool.releaseConnection(tmClient);
+      }
+
+      try {
+        queryMasterTask.stop();
+        if (!queryContext.getBool(SessionVars.DEBUG_ENABLED)) {
+          cleanup(queryId);
+        }
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+      }
+      Query query = queryMasterTask.getQuery();
+      if (query != null) {
+        QueryHistory queryHisory = query.getQueryHistory();
+        if (queryHisory != null) {
+          query.context.getQueryMasterContext().getWorkerContext().
+              getTaskHistoryWriter().appendHistory(queryHisory);
+        }
+      }
+      if(workerContext.isYarnContainerMode()) {
+        stop();
+      }
+    }
+  }
+
+  private TajoHeartbeat buildTajoHeartBeat(QueryMasterTask queryMasterTask) {
+    TajoHeartbeat.Builder builder = TajoHeartbeat.newBuilder();
+
+    builder.setConnectionInfo(workerContext.getConnectionInfo().getProto());
+    builder.setQueryId(queryMasterTask.getQueryId().getProto());
+    builder.setState(queryMasterTask.getState());
+    if (queryMasterTask.getQuery() != null) {
+      if (queryMasterTask.getQuery().getResultDesc() != null) {
+        
builder.setResultDesc(queryMasterTask.getQuery().getResultDesc().getProto());
+      }
+      builder.setQueryProgress(queryMasterTask.getQuery().getProgress());
+      builder.setQueryFinishTime(queryMasterTask.getQuery().getFinishTime());
+    }
+    return builder.build();
+  }
+
+  private class QueryStartEventHandler implements 
EventHandler<QueryStartEvent> {
+    @Override
+    public void handle(QueryStartEvent event) {
+      LOG.info("Start QueryStartEventHandler:" + event.getQueryId());
+      QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext,
+          event.getQueryId(), event.getSession(), event.getQueryContext(), 
event.getJsonExpr(), event.getLogicalPlanJson());
+
+      synchronized(queryMasterTasks) {
+        queryMasterTasks.put(event.getQueryId(), queryMasterTask);
+      }
+
+      queryMasterTask.init(systemConf);
+      if (!queryMasterTask.isInitError()) {
+        queryMasterTask.start();
+      }
+
+      queryContext = event.getQueryContext();
+
+      if (queryMasterTask.isInitError()) {
+        queryMasterContext.stopQuery(queryMasterTask.getQueryId());
+        return;
+      }
+    }
+  }
+
+  private class QueryStopEventHandler implements EventHandler<QueryStopEvent> {
+    @Override
+    public void handle(QueryStopEvent event) {
+      queryMasterContext.stopQuery(event.getQueryId());
+    }
+  }
+
+  class QueryHeartbeatThread extends Thread {
+    public QueryHeartbeatThread() {
+      super("QueryHeartbeatThread");
+    }
+
+    @Override
+    public void run() {
+      LOG.info("Start QueryMaster heartbeat thread");
+      while(!queryMasterStop.get()) {
+        List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>();
+        synchronized(queryMasterTasks) {
+          tempTasks.addAll(queryMasterTasks.values());
+        }
+        synchronized(queryMasterTasks) {
+          for(QueryMasterTask eachTask: tempTasks) {
+            NettyClientBase tmClient;
+            try {
+              // In TajoMaster HA mode, if backup master be active status,
+              // worker may fail to connect existing active master. Thus,
+              // if worker can't connect the master, worker should try to 
connect another master and
+              // update master address in worker context.
+              if 
(systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+                try {
+                  tmClient = 
connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+                      TajoMasterProtocol.class, true);
+                } catch (Exception e) {
+                  
queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
+                      HAServiceUtil.getResourceTrackerAddress(systemConf));
+                  queryMasterContext.getWorkerContext().setTajoMasterAddress(
+                      HAServiceUtil.getMasterUmbilicalAddress(systemConf));
+                  tmClient = 
connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+                      TajoMasterProtocol.class, true);
+                }
+              } else {
+                tmClient = 
connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+                    TajoMasterProtocol.class, true);
+              }
+
+              TajoMasterProtocol.TajoMasterProtocolService masterClientService 
= tmClient.getStub();
+
+              CallFuture<TajoHeartbeatResponse> callBack =
+                  new CallFuture<TajoHeartbeatResponse>();
+
+              TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask);
+              masterClientService.heartbeat(callBack.getController(), 
queryHeartbeat, callBack);
+            } catch (Throwable t) {
+              t.printStackTrace();
+            }
+          }
+        }
+        synchronized(queryMasterStop) {
+          try {
+            queryMasterStop.wait(2000);
+          } catch (InterruptedException e) {
+            break;
+          }
+        }
+      }
+      LOG.info("QueryMaster heartbeat thread stopped");
+    }
+  }
+
+  class ClientSessionTimeoutCheckThread extends Thread {
+    public void run() {
+      LOG.info("ClientSessionTimeoutCheckThread started");
+      while(!queryMasterStop.get()) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          break;
+        }
+        List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>();
+        synchronized(queryMasterTasks) {
+          tempTasks.addAll(queryMasterTasks.values());
+        }
+
+        for(QueryMasterTask eachTask: tempTasks) {
+          if(!eachTask.isStopped()) {
+            try {
+              long lastHeartbeat = eachTask.getLastClientHeartbeat();
+              long time = System.currentTimeMillis() - lastHeartbeat;
+              if(lastHeartbeat > 0 && time > querySessionTimeout * 1000) {
+                LOG.warn("Query " + eachTask.getQueryId() + " stopped cause 
query session timeout: " + time + " ms");
+                eachTask.expireQuerySession();
+              }
+            } catch (Exception e) {
+              LOG.error(eachTask.getQueryId() + ":" + e.getMessage(), e);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  class FinishedQueryMasterTaskCleanThread extends Thread {
+    public void run() {
+      int expireIntervalTime = 
systemConf.getIntVar(TajoConf.ConfVars.WORKER_HISTORY_EXPIRE_PERIOD);
+      LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval 
minutes = " + expireIntervalTime);
+      while(!queryMasterStop.get()) {
+        try {
+          Thread.sleep(60 * 1000 * 60);   // hourly
+        } catch (InterruptedException e) {
+          break;
+        }
+        try {
+          long expireTime = System.currentTimeMillis() - expireIntervalTime * 
60 * 1000;
+          cleanExpiredFinishedQueryMasterTask(expireTime);
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+        }
+      }
+    }
+
+    private void cleanExpiredFinishedQueryMasterTask(long expireTime) {
+      synchronized(finishedQueryMasterTasks) {
+        List<QueryId> expiredQueryIds = new ArrayList<QueryId>();
+        for(Map.Entry<QueryId, QueryMasterTask> entry: 
finishedQueryMasterTasks.entrySet()) {
+          if(entry.getValue().getStartTime() < expireTime) {
+            expiredQueryIds.add(entry.getKey());
+          }
+        }
+
+        for(QueryId eachId: expiredQueryIds) {
+          finishedQueryMasterTasks.remove(eachId);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
new file mode 100644
index 0000000..4a91326
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.tajo.*;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.container.TajoContainerId;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.session.Session;
+import org.apache.tajo.rpc.AsyncRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.worker.TajoWorker;
+
+import java.net.InetSocketAddress;
+
+public class QueryMasterManagerService extends CompositeService
+    implements QueryMasterProtocol.QueryMasterProtocolService.Interface {
+  private static final Log LOG = 
LogFactory.getLog(QueryMasterManagerService.class.getName());
+
+  private AsyncRpcServer rpcServer;
+  private InetSocketAddress bindAddr;
+  private String addr;
+  private int port;
+
+  private QueryMaster queryMaster;
+
+  private TajoWorker.WorkerContext workerContext;
+
+  public QueryMasterManagerService(TajoWorker.WorkerContext workerContext, int 
port) {
+    super(QueryMasterManagerService.class.getName());
+    this.workerContext = workerContext;
+    this.port = port;
+  }
+
+  public QueryMaster getQueryMaster() {
+    return queryMaster;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    Preconditions.checkArgument(conf instanceof TajoConf);
+    TajoConf tajoConf = (TajoConf) conf;
+    try {
+      // Setup RPC server
+      InetSocketAddress initIsa =
+          new InetSocketAddress("0.0.0.0", port);
+      if (initIsa.getAddress() == null) {
+        throw new IllegalArgumentException("Failed resolve of " + initIsa);
+      }
+
+      int workerNum = 
tajoConf.getIntVar(TajoConf.ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM);
+      this.rpcServer = new AsyncRpcServer(QueryMasterProtocol.class, this, 
initIsa, workerNum);
+      this.rpcServer.start();
+
+      this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
+      this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
+
+      this.port = bindAddr.getPort();
+
+      queryMaster = new QueryMaster(workerContext);
+      addService(queryMaster);
+
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+    // Get the master address
+    LOG.info("QueryMasterManagerService is bind to " + addr);
+    ((TajoConf)conf).setVar(TajoConf.ConfVars.WORKER_QM_RPC_ADDRESS, addr);
+
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    if(rpcServer != null) {
+      rpcServer.shutdown();
+    }
+    LOG.info("QueryMasterManagerService stopped");
+    super.stop();
+  }
+
+  public InetSocketAddress getBindAddr() {
+    return bindAddr;
+  }
+
+  public String getHostAndPort() {
+    return bindAddr.getHostName() + ":" + bindAddr.getPort();
+  }
+
+  @Override
+  public void getTask(RpcController controller, 
TajoWorkerProtocol.GetTaskRequestProto request,
+                      RpcCallback<TajoWorkerProtocol.TaskRequestProto> done) {
+    try {
+      ExecutionBlockId ebId = new 
ExecutionBlockId(request.getExecutionBlockId());
+      QueryMasterTask queryMasterTask = 
workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId());
+
+      if(queryMasterTask == null || queryMasterTask.isStopped()) {
+        done.run(DefaultTaskScheduler.stopTaskRunnerReq);
+      } else {
+        TajoContainerId cid =
+            
queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
+        LOG.debug("getTask:" + cid + ", ebId:" + ebId);
+        queryMasterTask.handleTaskRequestEvent(new 
TaskRequestEvent(request.getWorkerId(), cid, ebId, done));
+      }
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void statusUpdate(RpcController controller, 
TajoWorkerProtocol.TaskStatusProto request,
+                           RpcCallback<PrimitiveProtos.BoolProto> done) {
+    try {
+      QueryId queryId = new 
QueryId(request.getId().getTaskId().getExecutionBlockId().getQueryId());
+      TaskAttemptId attemptId = new TaskAttemptId(request.getId());
+      QueryMasterTask queryMasterTask = 
queryMaster.getQueryMasterTask(queryId);
+      if (queryMasterTask == null) {
+        queryMasterTask = queryMaster.getQueryMasterTask(queryId, true);
+      }
+      Stage sq = 
queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId());
+      Task task = sq.getTask(attemptId.getTaskId());
+      TaskAttempt attempt = task.getAttempt(attemptId.getId());
+
+      if(LOG.isDebugEnabled()){
+        LOG.debug(String.format("Task State: %s, Attempt State: %s", 
task.getState().name(), attempt.getState().name()));
+      }
+
+      if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
+        LOG.warn(attemptId + " Killed");
+        attempt.handle(
+            new TaskAttemptEvent(new TaskAttemptId(request.getId()), 
TaskAttemptEventType.TA_LOCAL_KILLED));
+      } else {
+        queryMasterTask.getEventHandler().handle(
+            new TaskAttemptStatusUpdateEvent(new 
TaskAttemptId(request.getId()), request));
+      }
+      done.run(TajoWorker.TRUE_PROTO);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      done.run(TajoWorker.FALSE_PROTO);
+    }
+  }
+
+  @Override
+  public void ping(RpcController controller,
+                   TajoIdProtos.ExecutionBlockIdProto requestProto,
+                   RpcCallback<PrimitiveProtos.BoolProto> done) {
+    done.run(TajoWorker.TRUE_PROTO);
+  }
+
+  @Override
+  public void fatalError(RpcController controller, 
TajoWorkerProtocol.TaskFatalErrorReport report,
+                         RpcCallback<PrimitiveProtos.BoolProto> done) {
+    try {
+      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+          new 
QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
+      if (queryMasterTask != null) {
+        queryMasterTask.handleTaskFailed(report);
+      } else {
+        LOG.warn("No QueryMasterTask: " + new TaskAttemptId(report.getId()));
+      }
+      done.run(TajoWorker.TRUE_PROTO);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      done.run(TajoWorker.FALSE_PROTO);
+    }
+  }
+
+  @Override
+  public void done(RpcController controller, 
TajoWorkerProtocol.TaskCompletionReport report,
+                   RpcCallback<PrimitiveProtos.BoolProto> done) {
+    try {
+      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+          new 
QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
+      if (queryMasterTask != null) {
+        queryMasterTask.getEventHandler().handle(new 
TaskCompletionEvent(report));
+      }
+      done.run(TajoWorker.TRUE_PROTO);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      done.run(TajoWorker.FALSE_PROTO);
+    }
+  }
+
+  @Override
+  public void doneExecutionBlock(
+      RpcController controller, TajoWorkerProtocol.ExecutionBlockReport 
request,
+      RpcCallback<PrimitiveProtos.BoolProto> done) {
+    QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(new 
QueryId(request.getEbId().getQueryId()));
+    if (queryMasterTask != null) {
+      ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId());
+      
queryMasterTask.getQuery().getStage(ebId).receiveExecutionBlockReport(request);
+    }
+    done.run(TajoWorker.TRUE_PROTO);
+  }
+
+  @Override
+  public void killQuery(RpcController controller, TajoIdProtos.QueryIdProto 
request,
+                        RpcCallback<PrimitiveProtos.BoolProto> done) {
+    QueryId queryId = new QueryId(request);
+    QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
+    if (queryMasterTask != null) {
+      Query query = queryMasterTask.getQuery();
+      if (query != null) {
+        query.handle(new QueryEvent(queryId, QueryEventType.KILL));
+      }
+    }
+  }
+
+  @Override
+  public void executeQuery(RpcController controller,
+                           TajoWorkerProtocol.QueryExecutionRequestProto 
request,
+                           RpcCallback<PrimitiveProtos.BoolProto> done) {
+    try {
+      workerContext.getWorkerSystemMetrics().counter("querymaster", 
"numQuery").inc();
+
+      QueryId queryId = new QueryId(request.getQueryId());
+      LOG.info("Receive executeQuery request:" + queryId);
+      queryMaster.handle(new QueryStartEvent(queryId,
+          new Session(request.getSession()),
+          new 
QueryContext(workerContext.getQueryMaster().getContext().getConf(),
+              request.getQueryContext()), request.getExprInJson().getValue(),
+          request.getLogicalPlanJson().getValue()));
+      done.run(TajoWorker.TRUE_PROTO);
+    } catch (Exception e) {
+      workerContext.getWorkerSystemMetrics().counter("querymaster", 
"errorQuery").inc();
+      LOG.error(e.getMessage(), e);
+      done.run(TajoWorker.FALSE_PROTO);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
new file mode 100644
index 0000000..bab5903
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@ -0,0 +1,638 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tajo.*;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.algebra.JsonHelper;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.UnimplementedException;
+import org.apache.tajo.ha.HAServiceUtil;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.TajoContainerProxy;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.rm.TajoWorkerResourceManager;
+import org.apache.tajo.session.Session;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.plan.verifier.VerifyException;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.StorageProperty;
+import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.util.metrics.TajoMetrics;
+import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
+import org.apache.tajo.worker.AbstractResourceAllocator;
+import org.apache.tajo.worker.TajoResourceAllocator;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.tajo.TajoProtos.QueryState;
+
+public class QueryMasterTask extends CompositeService {
+  private static final Log LOG = 
LogFactory.getLog(QueryMasterTask.class.getName());
+
+  // query submission directory is private!
+  final public static FsPermission STAGING_DIR_PERMISSION =
+      FsPermission.createImmutable((short) 0700); // rwx--------
+
+  public static final String TMP_STAGING_DIR_PREFIX = ".staging";
+
+  private QueryId queryId;
+
+  private Session session;
+
+  private QueryContext queryContext;
+
+  private QueryMasterTaskContext queryTaskContext;
+
+  private QueryMaster.QueryMasterContext queryMasterContext;
+
+  private Query query;
+
+  private MasterPlan masterPlan;
+
+  private String jsonExpr;
+
+  private String logicalPlanJson;
+
+  private AsyncDispatcher dispatcher;
+
+  private final long querySubmitTime;
+
+  private Map<String, TableDesc> tableDescMap = new HashMap<String, 
TableDesc>();
+
+  private TajoConf systemConf;
+
+  private AtomicLong lastClientHeartbeat = new AtomicLong(-1);
+
+  private AbstractResourceAllocator resourceAllocator;
+
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+
+  private TajoMetrics queryMetrics;
+
+  private Throwable initError;
+
+  private final List<TajoWorkerProtocol.TaskFatalErrorReport> diagnostics =
+      new ArrayList<TajoWorkerProtocol.TaskFatalErrorReport>();
+
+  public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
+                         QueryId queryId, Session session, QueryContext 
queryContext, String jsonExpr,
+                         String logicalPlanJson) {
+
+    super(QueryMasterTask.class.getName());
+    this.queryMasterContext = queryMasterContext;
+    this.queryId = queryId;
+    this.session = session;
+    this.queryContext = queryContext;
+    this.jsonExpr = jsonExpr;
+    this.logicalPlanJson = logicalPlanJson;
+    this.querySubmitTime = System.currentTimeMillis();
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    systemConf = (TajoConf)conf;
+
+    try {
+      queryTaskContext = new QueryMasterTaskContext();
+      String resourceManagerClassName = 
systemConf.getVar(TajoConf.ConfVars.RESOURCE_MANAGER_CLASS);
+
+      
if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) 
>= 0) {
+        resourceAllocator = new TajoResourceAllocator(queryTaskContext);
+      } else {
+        throw new UnimplementedException(resourceManagerClassName + " is not 
supported yet");
+      }
+      addService(resourceAllocator);
+
+      dispatcher = new AsyncDispatcher();
+      addService(dispatcher);
+
+      dispatcher.register(StageEventType.class, new StageEventDispatcher());
+      dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
+      dispatcher.register(TaskAttemptEventType.class, new 
TaskAttemptEventDispatcher());
+      dispatcher.register(QueryMasterQueryCompletedEvent.EventType.class, new 
QueryFinishEventHandler());
+      dispatcher.register(TaskSchedulerEvent.EventType.class, new 
TaskSchedulerDispatcher());
+      dispatcher.register(LocalTaskEventType.class, new 
LocalTaskEventHandler());
+
+      initStagingDir();
+
+      queryMetrics = new TajoMetrics(queryId.toString());
+
+      super.init(systemConf);
+    } catch (Throwable t) {
+      LOG.error(t.getMessage(), t);
+      initError = t;
+    }
+  }
+
+  public boolean isStopped() {
+    return stopped.get();
+  }
+
+  @Override
+  public void start() {
+    startQuery();
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+
+    if(stopped.getAndSet(true)) {
+      return;
+    }
+
+    LOG.info("Stopping QueryMasterTask:" + queryId);
+
+    try {
+      resourceAllocator.stop();
+    } catch (Throwable t) {
+      LOG.fatal(t.getMessage(), t);
+    }
+
+    RpcConnectionPool connPool = 
RpcConnectionPool.getPool(queryMasterContext.getConf());
+    NettyClientBase tmClient = null;
+    try {
+      // In TajoMaster HA mode, if backup master be active status,
+      // worker may fail to connect existing active master. Thus,
+      // if worker can't connect the master, worker should try to connect 
another master and
+      // update master address in worker context.
+      if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+        try {
+          tmClient = 
connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+              TajoMasterProtocol.class, true);
+        } catch (Exception e) {
+          queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
+              HAServiceUtil.getResourceTrackerAddress(systemConf));
+          queryMasterContext.getWorkerContext().setTajoMasterAddress(
+              HAServiceUtil.getMasterUmbilicalAddress(systemConf));
+          tmClient = 
connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+              TajoMasterProtocol.class, true);
+        }
+      } else {
+        tmClient = 
connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+            TajoMasterProtocol.class, true);
+      }
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    } finally {
+      connPool.releaseConnection(tmClient);
+    }
+
+    super.stop();
+
+    //TODO change report to tajo master
+    if (queryMetrics != null) {
+      queryMetrics.report(new MetricsConsoleReporter());
+    }
+
+    LOG.info("Stopped QueryMasterTask:" + queryId);
+  }
+
+  public void handleTaskRequestEvent(TaskRequestEvent event) {
+    ExecutionBlockId id = event.getExecutionBlockId();
+    query.getStage(id).handleTaskRequestEvent(event);
+  }
+
+  public void handleTaskFailed(TajoWorkerProtocol.TaskFatalErrorReport report) 
{
+    synchronized(diagnostics) {
+      if (diagnostics.size() < 10) {
+        diagnostics.add(report);
+      }
+    }
+
+    getEventHandler().handle(new TaskFatalErrorEvent(report));
+  }
+
+  public Collection<TajoWorkerProtocol.TaskFatalErrorReport> getDiagnostics() {
+    synchronized(diagnostics) {
+      return Collections.unmodifiableCollection(diagnostics);
+    }
+  }
+
+  private class StageEventDispatcher implements EventHandler<StageEvent> {
+    public void handle(StageEvent event) {
+      ExecutionBlockId id = event.getStageId();
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("StageEventDispatcher:" + id + "," + event.getType());
+      }
+      query.getStage(id).handle(event);
+    }
+  }
+
+  private class TaskEventDispatcher
+      implements EventHandler<TaskEvent> {
+    public void handle(TaskEvent event) {
+      TaskId taskId = event.getTaskId();
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("TaskEventDispatcher>" + taskId + "," + event.getType());
+      }
+      Task task = query.getStage(taskId.getExecutionBlockId()).
+          getTask(taskId);
+      task.handle(event);
+    }
+  }
+
+  private class TaskAttemptEventDispatcher
+      implements EventHandler<TaskAttemptEvent> {
+    public void handle(TaskAttemptEvent event) {
+      TaskAttemptId attemptId = event.getTaskAttemptId();
+      Stage stage = 
query.getStage(attemptId.getTaskId().getExecutionBlockId());
+      Task task = stage.getTask(attemptId.getTaskId());
+      TaskAttempt attempt = task.getAttempt(attemptId);
+      attempt.handle(event);
+    }
+  }
+
+  private class TaskSchedulerDispatcher
+      implements EventHandler<TaskSchedulerEvent> {
+    public void handle(TaskSchedulerEvent event) {
+      Stage stage = query.getStage(event.getExecutionBlockId());
+      stage.getTaskScheduler().handle(event);
+    }
+  }
+
+  private class LocalTaskEventHandler implements EventHandler<LocalTaskEvent> {
+    @Override
+    public void handle(LocalTaskEvent event) {
+      TajoContainerProxy proxy = (TajoContainerProxy) 
resourceAllocator.getContainers().get(event.getContainerId());
+      if (proxy != null) {
+        proxy.killTaskAttempt(event.getTaskAttemptId());
+      }
+    }
+  }
+
+  private class QueryFinishEventHandler implements 
EventHandler<QueryMasterQueryCompletedEvent> {
+    @Override
+    public void handle(QueryMasterQueryCompletedEvent event) {
+      QueryId queryId = event.getQueryId();
+      LOG.info("Query completion notified from " + queryId);
+
+      while (!isTerminatedState(query.getSynchronizedState())) {
+        try {
+          synchronized (this) {
+            wait(10);
+          }
+        } catch (InterruptedException e) {
+          LOG.error(e);
+        }
+      }
+      LOG.info("Query final state: " + query.getSynchronizedState());
+
+      queryMasterContext.getEventHandler().handle(new QueryStopEvent(queryId));
+    }
+  }
+
+  private static boolean isTerminatedState(QueryState state) {
+    return
+        state == QueryState.QUERY_SUCCEEDED ||
+        state == QueryState.QUERY_FAILED ||
+        state == QueryState.QUERY_KILLED ||
+        state == QueryState.QUERY_ERROR;
+  }
+
+  public synchronized void startQuery() {
+    StorageManager sm = null;
+    LogicalPlan plan = null;
+    try {
+      if (query != null) {
+        LOG.warn("Query already started");
+        return;
+      }
+      CatalogService catalog = 
getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
+      LogicalPlanner planner = new LogicalPlanner(catalog);
+      LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
+      Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
+      jsonExpr = null; // remove the possible OOM
+      plan = planner.createPlan(queryContext, expr);
+
+      StoreType storeType = PlannerUtil.getStoreType(plan);
+      if (storeType != null) {
+        sm = StorageManager.getStorageManager(systemConf, storeType);
+        StorageProperty storageProperty = sm.getStorageProperty();
+        if (storageProperty.isSortedInsert()) {
+          String tableName = PlannerUtil.getStoreTableName(plan);
+          LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+          TableDesc tableDesc =  PlannerUtil.getTableDesc(catalog, 
rootNode.getChild());
+          if (tableDesc == null) {
+            throw new VerifyException("Can't get table meta data from catalog: 
" + tableName);
+          }
+          List<LogicalPlanRewriteRule> storageSpecifiedRewriteRules = 
sm.getRewriteRules(
+              getQueryTaskContext().getQueryContext(), tableDesc);
+          if (storageSpecifiedRewriteRules != null) {
+            for (LogicalPlanRewriteRule eachRule: 
storageSpecifiedRewriteRules) {
+              optimizer.addRuleAfterToJoinOpt(eachRule);
+            }
+          }
+        }
+      }
+
+      optimizer.optimize(queryContext, plan);
+
+      for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+        LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), 
NodeType.SCAN);
+        if (scanNodes != null) {
+          for (LogicalNode eachScanNode : scanNodes) {
+            ScanNode scanNode = (ScanNode) eachScanNode;
+            tableDescMap.put(scanNode.getCanonicalName(), 
scanNode.getTableDesc());
+          }
+        }
+
+        scanNodes = PlannerUtil.findAllNodes(block.getRoot(), 
NodeType.PARTITIONS_SCAN);
+        if (scanNodes != null) {
+          for (LogicalNode eachScanNode : scanNodes) {
+            ScanNode scanNode = (ScanNode) eachScanNode;
+            tableDescMap.put(scanNode.getCanonicalName(), 
scanNode.getTableDesc());
+          }
+        }
+      }
+      MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+      queryMasterContext.getGlobalPlanner().build(masterPlan);
+
+      query = new Query(queryTaskContext, queryId, querySubmitTime,
+          "", queryTaskContext.getEventHandler(), masterPlan);
+
+      dispatcher.register(QueryEventType.class, query);
+      queryTaskContext.getEventHandler().handle(new QueryEvent(queryId, 
QueryEventType.START));
+    } catch (Throwable t) {
+      LOG.error(t.getMessage(), t);
+      initError = t;
+
+      if (plan != null && sm != null) {
+        LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+        try {
+          sm.rollbackOutputCommit(rootNode.getChild());
+        } catch (IOException e) {
+          LOG.warn(query.getId() + ", failed processing cleanup storage when 
query failed:" + e.getMessage(), e);
+        }
+      }
+    }
+  }
+
+  private void initStagingDir() throws IOException {
+    Path stagingDir = null;
+    FileSystem defaultFS = 
TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf);
+
+    try {
+
+      stagingDir = initStagingDir(systemConf, queryId.toString(), 
queryContext);
+
+      // Create a subdirectories
+      LOG.info("The staging dir '" + stagingDir + "' is created.");
+      queryContext.setStagingDir(stagingDir);
+    } catch (IOException ioe) {
+      if (stagingDir != null && defaultFS.exists(stagingDir)) {
+        try {
+          defaultFS.delete(stagingDir, true);
+          LOG.info("The staging directory '" + stagingDir + "' is deleted");
+        } catch (Exception e) {
+          LOG.warn(e.getMessage());
+        }
+      }
+
+      throw ioe;
+    }
+  }
+
+  /**
+   * It initializes the final output and staging directory and sets
+   * them to variables.
+   */
+  public static Path initStagingDir(TajoConf conf, String queryId, 
QueryContext context) throws IOException {
+
+    String realUser;
+    String currentUser;
+    UserGroupInformation ugi;
+    ugi = UserGroupInformation.getLoginUser();
+    realUser = ugi.getShortUserName();
+    currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+
+    FileSystem fs;
+    Path stagingDir;
+
+    ////////////////////////////////////////////
+    // Create Output Directory
+    ////////////////////////////////////////////
+
+    String outputPath = context.get(QueryVars.OUTPUT_TABLE_PATH, "");
+    if (context.isCreateTable() || context.isInsert()) {
+      if (outputPath == null || outputPath.isEmpty()) {
+        // hbase
+        stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), 
queryId);
+      } else {
+        stagingDir = StorageUtil.concatPath(context.getOutputPath(), 
TMP_STAGING_DIR_PREFIX, queryId);
+      }
+    } else {
+      stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
+    }
+
+    // initializ
+    fs = stagingDir.getFileSystem(conf);
+
+    if (fs.exists(stagingDir)) {
+      throw new IOException("The staging directory '" + stagingDir + "' 
already exists");
+    }
+    fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+    FileStatus fsStatus = fs.getFileStatus(stagingDir);
+    String owner = fsStatus.getOwner();
+
+    if (!owner.isEmpty() && !(owner.equals(currentUser) || 
owner.equals(realUser))) {
+      throw new IOException("The ownership on the user's query " +
+          "directory " + stagingDir + " is not as expected. " +
+          "It is owned by " + owner + ". The directory must " +
+          "be owned by the submitter " + currentUser + " or " +
+          "by " + realUser);
+    }
+
+    if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) {
+      LOG.info("Permissions on staging directory " + stagingDir + " are " +
+          "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
+          "to correct value " + STAGING_DIR_PERMISSION);
+      fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+    }
+
+    Path stagingResultDir = new Path(stagingDir, 
TajoConstants.RESULT_DIR_NAME);
+    fs.mkdirs(stagingResultDir);
+
+    return stagingDir;
+  }
+
+  public Query getQuery() {
+    return query;
+  }
+
+  protected void expireQuerySession() {
+    if(!isTerminatedState(query.getState()) && !(query.getState() == 
QueryState.QUERY_KILL_WAIT)){
+      query.handle(new QueryEvent(queryId, QueryEventType.KILL));
+    }
+  }
+
+  public QueryMasterTaskContext getQueryTaskContext() {
+    return queryTaskContext;
+  }
+
+  public EventHandler getEventHandler() {
+    return queryTaskContext.getEventHandler();
+  }
+
+  public void touchSessionTime() {
+    this.lastClientHeartbeat.set(System.currentTimeMillis());
+  }
+
+  public long getLastClientHeartbeat() {
+    return this.lastClientHeartbeat.get();
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  public boolean isInitError() {
+    return initError != null;
+  }
+
+  public QueryState getState() {
+    if(query == null) {
+      if (isInitError()) {
+        return QueryState.QUERY_ERROR;
+      } else {
+        return QueryState.QUERY_NOT_ASSIGNED;
+      }
+    } else {
+      return query.getState();
+    }
+  }
+
+  public Throwable getInitError() {
+    return initError;
+  }
+
+  public String getErrorMessage() {
+    if (isInitError()) {
+      return StringUtils.stringifyException(initError);
+    } else {
+      return null;
+    }
+  }
+
+  public long getQuerySubmitTime() {
+    return this.querySubmitTime;
+  }
+
+  public class QueryMasterTaskContext {
+    EventHandler eventHandler;
+    public QueryMaster.QueryMasterContext getQueryMasterContext() {
+      return queryMasterContext;
+    }
+
+    public Session getSession() {
+      return session;
+    }
+
+    public QueryContext getQueryContext() {
+      return queryContext;
+    }
+
+    public TajoConf getConf() {
+      return systemConf;
+    }
+
+    public Clock getClock() {
+      return queryMasterContext.getClock();
+    }
+
+    public Query getQuery() {
+      return query;
+    }
+
+    public QueryId getQueryId() {
+      return queryId;
+    }
+
+    public Path getStagingDir() {
+      return queryContext.getStagingDir();
+    }
+
+    public synchronized EventHandler getEventHandler() {
+      if(eventHandler == null) {
+        eventHandler = dispatcher.getEventHandler();
+      }
+      return eventHandler;
+    }
+
+    public AsyncDispatcher getDispatcher() {
+      return dispatcher;
+    }
+
+    public Stage getStage(ExecutionBlockId id) {
+      return query.getStage(id);
+    }
+
+    public Map<String, TableDesc> getTableDescMap() {
+      return tableDescMap;
+    }
+
+    public float getProgress() {
+      if(query == null) {
+        return 0.0f;
+      }
+      return query.getProgress();
+    }
+
+    public AbstractResourceAllocator getResourceAllocator() {
+      return resourceAllocator;
+    }
+
+    public TajoMetrics getQueryMetrics() {
+      return queryMetrics;
+    }
+  }
+}
\ No newline at end of file

Reply via email to