http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java new file mode 100644 index 0000000..76df397 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -0,0 +1,631 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.querymaster; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tajo.*; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.planner.global.GlobalPlanner; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.ha.HAServiceUtil; +import org.apache.tajo.ipc.TajoMasterProtocol; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.master.event.QueryStartEvent; +import org.apache.tajo.master.event.QueryStopEvent; +import org.apache.tajo.rpc.CallFuture; +import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.NullCallback; +import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; +import org.apache.tajo.util.NetUtils; +import org.apache.tajo.util.history.QueryHistory; +import org.apache.tajo.worker.TajoWorker; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeat; +import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse; + +// TODO - when exception, send error status to QueryJobManager +public class QueryMaster extends CompositeService implements EventHandler { + private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName()); + + private int querySessionTimeout; + + private Clock clock; + + private AsyncDispatcher dispatcher; + + private GlobalPlanner globalPlanner; + + private TajoConf systemConf; + + private Map<QueryId, QueryMasterTask> queryMasterTasks = Maps.newConcurrentMap(); + + private Map<QueryId, QueryMasterTask> finishedQueryMasterTasks = Maps.newConcurrentMap(); + + private ClientSessionTimeoutCheckThread clientSessionTimeoutCheckThread; + + private AtomicBoolean queryMasterStop = new AtomicBoolean(false); + + private QueryMasterContext queryMasterContext; + + private QueryContext queryContext; + + private QueryHeartbeatThread queryHeartbeatThread; + + private FinishedQueryMasterTaskCleanThread finishedQueryMasterTaskCleanThread; + + private TajoWorker.WorkerContext workerContext; + + private RpcConnectionPool connPool; + + private ExecutorService eventExecutor; + + public QueryMaster(TajoWorker.WorkerContext workerContext) { + super(QueryMaster.class.getName()); + this.workerContext = workerContext; + } + + public void init(Configuration conf) { + LOG.info("QueryMaster init"); + try { + this.systemConf = (TajoConf)conf; + this.connPool = RpcConnectionPool.getPool(systemConf); + + querySessionTimeout = systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT); + queryMasterContext = new QueryMasterContext(systemConf); + + clock = new SystemClock(); + + this.dispatcher = new AsyncDispatcher(); + addIfService(dispatcher); + + globalPlanner = new GlobalPlanner(systemConf, workerContext); + + dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler()); + dispatcher.register(QueryStopEvent.EventType.class, new QueryStopEventHandler()); + + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + throw new RuntimeException(t); + } + super.init(conf); + } + + @Override + public void start() { + LOG.info("QueryMaster start"); + + queryHeartbeatThread = new QueryHeartbeatThread(); + queryHeartbeatThread.start(); + + clientSessionTimeoutCheckThread = new ClientSessionTimeoutCheckThread(); + clientSessionTimeoutCheckThread.start(); + + finishedQueryMasterTaskCleanThread = new FinishedQueryMasterTaskCleanThread(); + finishedQueryMasterTaskCleanThread.start(); + + eventExecutor = Executors.newSingleThreadExecutor(); + super.start(); + } + + @Override + public void stop() { + if(queryMasterStop.getAndSet(true)){ + return; + } + + if(queryHeartbeatThread != null) { + queryHeartbeatThread.interrupt(); + } + + if(clientSessionTimeoutCheckThread != null) { + clientSessionTimeoutCheckThread.interrupt(); + } + + if(finishedQueryMasterTaskCleanThread != null) { + finishedQueryMasterTaskCleanThread.interrupt(); + } + + if(eventExecutor != null){ + eventExecutor.shutdown(); + } + + super.stop(); + + LOG.info("QueryMaster stop"); + if(queryMasterContext.getWorkerContext().isYarnContainerMode()) { + queryMasterContext.getWorkerContext().stopWorker(true); + } + } + + protected void cleanupExecutionBlock(List<TajoIdProtos.ExecutionBlockIdProto> executionBlockIds) { + StringBuilder cleanupMessage = new StringBuilder(); + String prefix = ""; + for (TajoIdProtos.ExecutionBlockIdProto eachEbId: executionBlockIds) { + cleanupMessage.append(prefix).append(new ExecutionBlockId(eachEbId).toString()); + prefix = ","; + } + LOG.info("cleanup executionBlocks: " + cleanupMessage); + NettyClientBase rpc = null; + List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker(); + TajoWorkerProtocol.ExecutionBlockListProto.Builder builder = TajoWorkerProtocol.ExecutionBlockListProto.newBuilder(); + builder.addAllExecutionBlockId(Lists.newArrayList(executionBlockIds)); + TajoWorkerProtocol.ExecutionBlockListProto executionBlockListProto = builder.build(); + + for (TajoMasterProtocol.WorkerResourceProto worker : workers) { + try { + TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo(); + rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()), + TajoWorkerProtocol.class, true); + TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub(); + + tajoWorkerProtocolService.cleanupExecutionBlocks(null, executionBlockListProto, NullCallback.get()); + } catch (Exception e) { + continue; + } finally { + connPool.releaseConnection(rpc); + } + } + } + + private void cleanup(QueryId queryId) { + LOG.info("cleanup query resources : " + queryId); + NettyClientBase rpc = null; + List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker(); + + for (TajoMasterProtocol.WorkerResourceProto worker : workers) { + try { + TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo(); + rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()), + TajoWorkerProtocol.class, true); + TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub(); + + tajoWorkerProtocolService.cleanup(null, queryId.getProto(), NullCallback.get()); + } catch (Exception e) { + LOG.error(e.getMessage()); + } finally { + connPool.releaseConnection(rpc); + } + } + } + + public List<TajoMasterProtocol.WorkerResourceProto> getAllWorker() { + + NettyClientBase rpc = null; + try { + // In TajoMaster HA mode, if backup master be active status, + // worker may fail to connect existing active master. Thus, + // if worker can't connect the master, worker should try to connect another master and + // update master address in worker context. + if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + try { + rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } catch (Exception e) { + queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr( + HAServiceUtil.getResourceTrackerAddress(systemConf)); + queryMasterContext.getWorkerContext().setTajoMasterAddress( + HAServiceUtil.getMasterUmbilicalAddress(systemConf)); + rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + } else { + rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + + TajoMasterProtocol.TajoMasterProtocolService masterService = rpc.getStub(); + + CallFuture<TajoMasterProtocol.WorkerResourcesRequest> callBack = + new CallFuture<TajoMasterProtocol.WorkerResourcesRequest>(); + masterService.getAllWorkerResource(callBack.getController(), + PrimitiveProtos.NullProto.getDefaultInstance(), callBack); + + TajoMasterProtocol.WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS); + return workerResourcesRequest.getWorkerResourcesList(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } finally { + connPool.releaseConnection(rpc); + } + return new ArrayList<TajoMasterProtocol.WorkerResourceProto>(); + } + + public void reportQueryStatusToQueryMaster(QueryId queryId, TajoProtos.QueryState state) { + LOG.info("Send QueryMaster Ready to QueryJobManager:" + queryId); + NettyClientBase tmClient = null; + try { + // In TajoMaster HA mode, if backup master be active status, + // worker may fail to connect existing active master. Thus, + // if worker can't connect the master, worker should try to connect another master and + // update master address in worker context. + if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + try { + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } catch (Exception e) { + queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr( + HAServiceUtil.getResourceTrackerAddress(systemConf)); + queryMasterContext.getWorkerContext().setTajoMasterAddress( + HAServiceUtil.getMasterUmbilicalAddress(systemConf)); + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + } else { + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + + TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); + + TajoHeartbeat.Builder queryHeartbeatBuilder = TajoHeartbeat.newBuilder() + .setConnectionInfo(workerContext.getConnectionInfo().getProto()) + .setState(state) + .setQueryId(queryId.getProto()); + + CallFuture<TajoHeartbeatResponse> callBack = + new CallFuture<TajoHeartbeatResponse>(); + + masterClientService.heartbeat(callBack.getController(), queryHeartbeatBuilder.build(), callBack); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } finally { + connPool.releaseConnection(tmClient); + } + } + + @Override + public void handle(Event event) { + dispatcher.getEventHandler().handle(event); + } + + public Query getQuery(QueryId queryId) { + return queryMasterTasks.get(queryId).getQuery(); + } + + public QueryMasterTask getQueryMasterTask(QueryId queryId) { + return queryMasterTasks.get(queryId); + } + + public QueryMasterTask getQueryMasterTask(QueryId queryId, boolean includeFinished) { + QueryMasterTask queryMasterTask = queryMasterTasks.get(queryId); + if(queryMasterTask != null) { + return queryMasterTask; + } else { + if(includeFinished) { + return finishedQueryMasterTasks.get(queryId); + } else { + return null; + } + } + } + + public QueryMasterContext getContext() { + return this.queryMasterContext; + } + + public Collection<QueryMasterTask> getQueryMasterTasks() { + return queryMasterTasks.values(); + } + + public Collection<QueryMasterTask> getFinishedQueryMasterTasks() { + return finishedQueryMasterTasks.values(); + } + + public class QueryMasterContext { + private TajoConf conf; + + public QueryMasterContext(TajoConf conf) { + this.conf = conf; + } + + public TajoConf getConf() { + return conf; + } + + public ExecutorService getEventExecutor(){ + return eventExecutor; + } + + public AsyncDispatcher getDispatcher() { + return dispatcher; + } + + public Clock getClock() { + return clock; + } + + public QueryMaster getQueryMaster() { + return QueryMaster.this; + } + + public GlobalPlanner getGlobalPlanner() { + return globalPlanner; + } + + public TajoWorker.WorkerContext getWorkerContext() { + return workerContext; + } + + public EventHandler getEventHandler() { + return dispatcher.getEventHandler(); + } + + public void stopQuery(QueryId queryId) { + QueryMasterTask queryMasterTask = queryMasterTasks.remove(queryId); + if(queryMasterTask == null) { + LOG.warn("No query info:" + queryId); + return; + } + + finishedQueryMasterTasks.put(queryId, queryMasterTask); + + TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(queryMasterTask); + CallFuture<TajoHeartbeatResponse> future = new CallFuture<TajoHeartbeatResponse>(); + + NettyClientBase tmClient = null; + try { + // In TajoMaster HA mode, if backup master be active status, + // worker may fail to connect existing active master. Thus, + // if worker can't connect the master, worker should try to connect another master and + // update master address in worker context. + if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + try { + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } catch (Exception e) { + queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf)); + queryMasterContext.getWorkerContext().setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf)); + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + } else { + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + + TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); + masterClientService.heartbeat(future.getController(), queryHeartbeat, future); + } catch (Exception e) { + //this function will be closed in new thread. + //When tajo do stop cluster, tajo master maybe throw closed connection exception + + LOG.error(e.getMessage(), e); + } finally { + connPool.releaseConnection(tmClient); + } + + try { + queryMasterTask.stop(); + if (!queryContext.getBool(SessionVars.DEBUG_ENABLED)) { + cleanup(queryId); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + Query query = queryMasterTask.getQuery(); + if (query != null) { + QueryHistory queryHisory = query.getQueryHistory(); + if (queryHisory != null) { + query.context.getQueryMasterContext().getWorkerContext(). + getTaskHistoryWriter().appendHistory(queryHisory); + } + } + if(workerContext.isYarnContainerMode()) { + stop(); + } + } + } + + private TajoHeartbeat buildTajoHeartBeat(QueryMasterTask queryMasterTask) { + TajoHeartbeat.Builder builder = TajoHeartbeat.newBuilder(); + + builder.setConnectionInfo(workerContext.getConnectionInfo().getProto()); + builder.setQueryId(queryMasterTask.getQueryId().getProto()); + builder.setState(queryMasterTask.getState()); + if (queryMasterTask.getQuery() != null) { + if (queryMasterTask.getQuery().getResultDesc() != null) { + builder.setResultDesc(queryMasterTask.getQuery().getResultDesc().getProto()); + } + builder.setQueryProgress(queryMasterTask.getQuery().getProgress()); + builder.setQueryFinishTime(queryMasterTask.getQuery().getFinishTime()); + } + return builder.build(); + } + + private class QueryStartEventHandler implements EventHandler<QueryStartEvent> { + @Override + public void handle(QueryStartEvent event) { + LOG.info("Start QueryStartEventHandler:" + event.getQueryId()); + QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext, + event.getQueryId(), event.getSession(), event.getQueryContext(), event.getJsonExpr(), event.getLogicalPlanJson()); + + synchronized(queryMasterTasks) { + queryMasterTasks.put(event.getQueryId(), queryMasterTask); + } + + queryMasterTask.init(systemConf); + if (!queryMasterTask.isInitError()) { + queryMasterTask.start(); + } + + queryContext = event.getQueryContext(); + + if (queryMasterTask.isInitError()) { + queryMasterContext.stopQuery(queryMasterTask.getQueryId()); + return; + } + } + } + + private class QueryStopEventHandler implements EventHandler<QueryStopEvent> { + @Override + public void handle(QueryStopEvent event) { + queryMasterContext.stopQuery(event.getQueryId()); + } + } + + class QueryHeartbeatThread extends Thread { + public QueryHeartbeatThread() { + super("QueryHeartbeatThread"); + } + + @Override + public void run() { + LOG.info("Start QueryMaster heartbeat thread"); + while(!queryMasterStop.get()) { + List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>(); + synchronized(queryMasterTasks) { + tempTasks.addAll(queryMasterTasks.values()); + } + synchronized(queryMasterTasks) { + for(QueryMasterTask eachTask: tempTasks) { + NettyClientBase tmClient; + try { + // In TajoMaster HA mode, if backup master be active status, + // worker may fail to connect existing active master. Thus, + // if worker can't connect the master, worker should try to connect another master and + // update master address in worker context. + if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + try { + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } catch (Exception e) { + queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr( + HAServiceUtil.getResourceTrackerAddress(systemConf)); + queryMasterContext.getWorkerContext().setTajoMasterAddress( + HAServiceUtil.getMasterUmbilicalAddress(systemConf)); + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + } else { + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + + TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); + + CallFuture<TajoHeartbeatResponse> callBack = + new CallFuture<TajoHeartbeatResponse>(); + + TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask); + masterClientService.heartbeat(callBack.getController(), queryHeartbeat, callBack); + } catch (Throwable t) { + t.printStackTrace(); + } + } + } + synchronized(queryMasterStop) { + try { + queryMasterStop.wait(2000); + } catch (InterruptedException e) { + break; + } + } + } + LOG.info("QueryMaster heartbeat thread stopped"); + } + } + + class ClientSessionTimeoutCheckThread extends Thread { + public void run() { + LOG.info("ClientSessionTimeoutCheckThread started"); + while(!queryMasterStop.get()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + break; + } + List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>(); + synchronized(queryMasterTasks) { + tempTasks.addAll(queryMasterTasks.values()); + } + + for(QueryMasterTask eachTask: tempTasks) { + if(!eachTask.isStopped()) { + try { + long lastHeartbeat = eachTask.getLastClientHeartbeat(); + long time = System.currentTimeMillis() - lastHeartbeat; + if(lastHeartbeat > 0 && time > querySessionTimeout * 1000) { + LOG.warn("Query " + eachTask.getQueryId() + " stopped cause query session timeout: " + time + " ms"); + eachTask.expireQuerySession(); + } + } catch (Exception e) { + LOG.error(eachTask.getQueryId() + ":" + e.getMessage(), e); + } + } + } + } + } + } + + class FinishedQueryMasterTaskCleanThread extends Thread { + public void run() { + int expireIntervalTime = systemConf.getIntVar(TajoConf.ConfVars.WORKER_HISTORY_EXPIRE_PERIOD); + LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval minutes = " + expireIntervalTime); + while(!queryMasterStop.get()) { + try { + Thread.sleep(60 * 1000 * 60); // hourly + } catch (InterruptedException e) { + break; + } + try { + long expireTime = System.currentTimeMillis() - expireIntervalTime * 60 * 1000; + cleanExpiredFinishedQueryMasterTask(expireTime); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + } + + private void cleanExpiredFinishedQueryMasterTask(long expireTime) { + synchronized(finishedQueryMasterTasks) { + List<QueryId> expiredQueryIds = new ArrayList<QueryId>(); + for(Map.Entry<QueryId, QueryMasterTask> entry: finishedQueryMasterTasks.entrySet()) { + if(entry.getValue().getStartTime() < expireTime) { + expiredQueryIds.add(entry.getKey()); + } + } + + for(QueryId eachId: expiredQueryIds) { + finishedQueryMasterTasks.remove(eachId); + } + } + } + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java new file mode 100644 index 0000000..4a91326 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.querymaster; + +import com.google.common.base.Preconditions; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.tajo.*; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.ipc.QueryMasterProtocol; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.master.container.TajoContainerId; +import org.apache.tajo.master.event.*; +import org.apache.tajo.session.Session; +import org.apache.tajo.rpc.AsyncRpcServer; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; +import org.apache.tajo.util.NetUtils; +import org.apache.tajo.worker.TajoWorker; + +import java.net.InetSocketAddress; + +public class QueryMasterManagerService extends CompositeService + implements QueryMasterProtocol.QueryMasterProtocolService.Interface { + private static final Log LOG = LogFactory.getLog(QueryMasterManagerService.class.getName()); + + private AsyncRpcServer rpcServer; + private InetSocketAddress bindAddr; + private String addr; + private int port; + + private QueryMaster queryMaster; + + private TajoWorker.WorkerContext workerContext; + + public QueryMasterManagerService(TajoWorker.WorkerContext workerContext, int port) { + super(QueryMasterManagerService.class.getName()); + this.workerContext = workerContext; + this.port = port; + } + + public QueryMaster getQueryMaster() { + return queryMaster; + } + + @Override + public void init(Configuration conf) { + Preconditions.checkArgument(conf instanceof TajoConf); + TajoConf tajoConf = (TajoConf) conf; + try { + // Setup RPC server + InetSocketAddress initIsa = + new InetSocketAddress("0.0.0.0", port); + if (initIsa.getAddress() == null) { + throw new IllegalArgumentException("Failed resolve of " + initIsa); + } + + int workerNum = tajoConf.getIntVar(TajoConf.ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM); + this.rpcServer = new AsyncRpcServer(QueryMasterProtocol.class, this, initIsa, workerNum); + this.rpcServer.start(); + + this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress()); + this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort(); + + this.port = bindAddr.getPort(); + + queryMaster = new QueryMaster(workerContext); + addService(queryMaster); + + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + // Get the master address + LOG.info("QueryMasterManagerService is bind to " + addr); + ((TajoConf)conf).setVar(TajoConf.ConfVars.WORKER_QM_RPC_ADDRESS, addr); + + super.init(conf); + } + + @Override + public void start() { + super.start(); + } + + @Override + public void stop() { + if(rpcServer != null) { + rpcServer.shutdown(); + } + LOG.info("QueryMasterManagerService stopped"); + super.stop(); + } + + public InetSocketAddress getBindAddr() { + return bindAddr; + } + + public String getHostAndPort() { + return bindAddr.getHostName() + ":" + bindAddr.getPort(); + } + + @Override + public void getTask(RpcController controller, TajoWorkerProtocol.GetTaskRequestProto request, + RpcCallback<TajoWorkerProtocol.TaskRequestProto> done) { + try { + ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId()); + QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId()); + + if(queryMasterTask == null || queryMasterTask.isStopped()) { + done.run(DefaultTaskScheduler.stopTaskRunnerReq); + } else { + TajoContainerId cid = + queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId()); + LOG.debug("getTask:" + cid + ", ebId:" + ebId); + queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(request.getWorkerId(), cid, ebId, done)); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + + @Override + public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request, + RpcCallback<PrimitiveProtos.BoolProto> done) { + try { + QueryId queryId = new QueryId(request.getId().getTaskId().getExecutionBlockId().getQueryId()); + TaskAttemptId attemptId = new TaskAttemptId(request.getId()); + QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId); + if (queryMasterTask == null) { + queryMasterTask = queryMaster.getQueryMasterTask(queryId, true); + } + Stage sq = queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId()); + Task task = sq.getTask(attemptId.getTaskId()); + TaskAttempt attempt = task.getAttempt(attemptId.getId()); + + if(LOG.isDebugEnabled()){ + LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name())); + } + + if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) { + LOG.warn(attemptId + " Killed"); + attempt.handle( + new TaskAttemptEvent(new TaskAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED)); + } else { + queryMasterTask.getEventHandler().handle( + new TaskAttemptStatusUpdateEvent(new TaskAttemptId(request.getId()), request)); + } + done.run(TajoWorker.TRUE_PROTO); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + done.run(TajoWorker.FALSE_PROTO); + } + } + + @Override + public void ping(RpcController controller, + TajoIdProtos.ExecutionBlockIdProto requestProto, + RpcCallback<PrimitiveProtos.BoolProto> done) { + done.run(TajoWorker.TRUE_PROTO); + } + + @Override + public void fatalError(RpcController controller, TajoWorkerProtocol.TaskFatalErrorReport report, + RpcCallback<PrimitiveProtos.BoolProto> done) { + try { + QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask( + new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId())); + if (queryMasterTask != null) { + queryMasterTask.handleTaskFailed(report); + } else { + LOG.warn("No QueryMasterTask: " + new TaskAttemptId(report.getId())); + } + done.run(TajoWorker.TRUE_PROTO); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + done.run(TajoWorker.FALSE_PROTO); + } + } + + @Override + public void done(RpcController controller, TajoWorkerProtocol.TaskCompletionReport report, + RpcCallback<PrimitiveProtos.BoolProto> done) { + try { + QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask( + new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId())); + if (queryMasterTask != null) { + queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report)); + } + done.run(TajoWorker.TRUE_PROTO); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + done.run(TajoWorker.FALSE_PROTO); + } + } + + @Override + public void doneExecutionBlock( + RpcController controller, TajoWorkerProtocol.ExecutionBlockReport request, + RpcCallback<PrimitiveProtos.BoolProto> done) { + QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(new QueryId(request.getEbId().getQueryId())); + if (queryMasterTask != null) { + ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId()); + queryMasterTask.getQuery().getStage(ebId).receiveExecutionBlockReport(request); + } + done.run(TajoWorker.TRUE_PROTO); + } + + @Override + public void killQuery(RpcController controller, TajoIdProtos.QueryIdProto request, + RpcCallback<PrimitiveProtos.BoolProto> done) { + QueryId queryId = new QueryId(request); + QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId); + if (queryMasterTask != null) { + Query query = queryMasterTask.getQuery(); + if (query != null) { + query.handle(new QueryEvent(queryId, QueryEventType.KILL)); + } + } + } + + @Override + public void executeQuery(RpcController controller, + TajoWorkerProtocol.QueryExecutionRequestProto request, + RpcCallback<PrimitiveProtos.BoolProto> done) { + try { + workerContext.getWorkerSystemMetrics().counter("querymaster", "numQuery").inc(); + + QueryId queryId = new QueryId(request.getQueryId()); + LOG.info("Receive executeQuery request:" + queryId); + queryMaster.handle(new QueryStartEvent(queryId, + new Session(request.getSession()), + new QueryContext(workerContext.getQueryMaster().getContext().getConf(), + request.getQueryContext()), request.getExprInJson().getValue(), + request.getLogicalPlanJson().getValue())); + done.run(TajoWorker.TRUE_PROTO); + } catch (Exception e) { + workerContext.getWorkerSystemMetrics().counter("querymaster", "errorQuery").inc(); + LOG.error(e.getMessage(), e); + done.run(TajoWorker.FALSE_PROTO); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java new file mode 100644 index 0000000..bab5903 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@ -0,0 +1,638 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.querymaster; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.tajo.*; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.algebra.JsonHelper; +import org.apache.tajo.catalog.CatalogService; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.UnimplementedException; +import org.apache.tajo.ha.HAServiceUtil; +import org.apache.tajo.ipc.TajoMasterProtocol; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.master.TajoContainerProxy; +import org.apache.tajo.master.event.*; +import org.apache.tajo.master.rm.TajoWorkerResourceManager; +import org.apache.tajo.session.Session; +import org.apache.tajo.plan.LogicalOptimizer; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.LogicalRootNode; +import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.plan.verifier.VerifyException; +import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.storage.StorageProperty; +import org.apache.tajo.storage.StorageUtil; +import org.apache.tajo.util.metrics.TajoMetrics; +import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter; +import org.apache.tajo.worker.AbstractResourceAllocator; +import org.apache.tajo.worker.TajoResourceAllocator; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.tajo.TajoProtos.QueryState; + +public class QueryMasterTask extends CompositeService { + private static final Log LOG = LogFactory.getLog(QueryMasterTask.class.getName()); + + // query submission directory is private! + final public static FsPermission STAGING_DIR_PERMISSION = + FsPermission.createImmutable((short) 0700); // rwx-------- + + public static final String TMP_STAGING_DIR_PREFIX = ".staging"; + + private QueryId queryId; + + private Session session; + + private QueryContext queryContext; + + private QueryMasterTaskContext queryTaskContext; + + private QueryMaster.QueryMasterContext queryMasterContext; + + private Query query; + + private MasterPlan masterPlan; + + private String jsonExpr; + + private String logicalPlanJson; + + private AsyncDispatcher dispatcher; + + private final long querySubmitTime; + + private Map<String, TableDesc> tableDescMap = new HashMap<String, TableDesc>(); + + private TajoConf systemConf; + + private AtomicLong lastClientHeartbeat = new AtomicLong(-1); + + private AbstractResourceAllocator resourceAllocator; + + private AtomicBoolean stopped = new AtomicBoolean(false); + + private TajoMetrics queryMetrics; + + private Throwable initError; + + private final List<TajoWorkerProtocol.TaskFatalErrorReport> diagnostics = + new ArrayList<TajoWorkerProtocol.TaskFatalErrorReport>(); + + public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext, + QueryId queryId, Session session, QueryContext queryContext, String jsonExpr, + String logicalPlanJson) { + + super(QueryMasterTask.class.getName()); + this.queryMasterContext = queryMasterContext; + this.queryId = queryId; + this.session = session; + this.queryContext = queryContext; + this.jsonExpr = jsonExpr; + this.logicalPlanJson = logicalPlanJson; + this.querySubmitTime = System.currentTimeMillis(); + } + + @Override + public void init(Configuration conf) { + systemConf = (TajoConf)conf; + + try { + queryTaskContext = new QueryMasterTaskContext(); + String resourceManagerClassName = systemConf.getVar(TajoConf.ConfVars.RESOURCE_MANAGER_CLASS); + + if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) { + resourceAllocator = new TajoResourceAllocator(queryTaskContext); + } else { + throw new UnimplementedException(resourceManagerClassName + " is not supported yet"); + } + addService(resourceAllocator); + + dispatcher = new AsyncDispatcher(); + addService(dispatcher); + + dispatcher.register(StageEventType.class, new StageEventDispatcher()); + dispatcher.register(TaskEventType.class, new TaskEventDispatcher()); + dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher()); + dispatcher.register(QueryMasterQueryCompletedEvent.EventType.class, new QueryFinishEventHandler()); + dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher()); + dispatcher.register(LocalTaskEventType.class, new LocalTaskEventHandler()); + + initStagingDir(); + + queryMetrics = new TajoMetrics(queryId.toString()); + + super.init(systemConf); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + initError = t; + } + } + + public boolean isStopped() { + return stopped.get(); + } + + @Override + public void start() { + startQuery(); + super.start(); + } + + @Override + public void stop() { + + if(stopped.getAndSet(true)) { + return; + } + + LOG.info("Stopping QueryMasterTask:" + queryId); + + try { + resourceAllocator.stop(); + } catch (Throwable t) { + LOG.fatal(t.getMessage(), t); + } + + RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf()); + NettyClientBase tmClient = null; + try { + // In TajoMaster HA mode, if backup master be active status, + // worker may fail to connect existing active master. Thus, + // if worker can't connect the master, worker should try to connect another master and + // update master address in worker context. + if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + try { + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } catch (Exception e) { + queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr( + HAServiceUtil.getResourceTrackerAddress(systemConf)); + queryMasterContext.getWorkerContext().setTajoMasterAddress( + HAServiceUtil.getMasterUmbilicalAddress(systemConf)); + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + } else { + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } finally { + connPool.releaseConnection(tmClient); + } + + super.stop(); + + //TODO change report to tajo master + if (queryMetrics != null) { + queryMetrics.report(new MetricsConsoleReporter()); + } + + LOG.info("Stopped QueryMasterTask:" + queryId); + } + + public void handleTaskRequestEvent(TaskRequestEvent event) { + ExecutionBlockId id = event.getExecutionBlockId(); + query.getStage(id).handleTaskRequestEvent(event); + } + + public void handleTaskFailed(TajoWorkerProtocol.TaskFatalErrorReport report) { + synchronized(diagnostics) { + if (diagnostics.size() < 10) { + diagnostics.add(report); + } + } + + getEventHandler().handle(new TaskFatalErrorEvent(report)); + } + + public Collection<TajoWorkerProtocol.TaskFatalErrorReport> getDiagnostics() { + synchronized(diagnostics) { + return Collections.unmodifiableCollection(diagnostics); + } + } + + private class StageEventDispatcher implements EventHandler<StageEvent> { + public void handle(StageEvent event) { + ExecutionBlockId id = event.getStageId(); + if(LOG.isDebugEnabled()) { + LOG.debug("StageEventDispatcher:" + id + "," + event.getType()); + } + query.getStage(id).handle(event); + } + } + + private class TaskEventDispatcher + implements EventHandler<TaskEvent> { + public void handle(TaskEvent event) { + TaskId taskId = event.getTaskId(); + if(LOG.isDebugEnabled()) { + LOG.debug("TaskEventDispatcher>" + taskId + "," + event.getType()); + } + Task task = query.getStage(taskId.getExecutionBlockId()). + getTask(taskId); + task.handle(event); + } + } + + private class TaskAttemptEventDispatcher + implements EventHandler<TaskAttemptEvent> { + public void handle(TaskAttemptEvent event) { + TaskAttemptId attemptId = event.getTaskAttemptId(); + Stage stage = query.getStage(attemptId.getTaskId().getExecutionBlockId()); + Task task = stage.getTask(attemptId.getTaskId()); + TaskAttempt attempt = task.getAttempt(attemptId); + attempt.handle(event); + } + } + + private class TaskSchedulerDispatcher + implements EventHandler<TaskSchedulerEvent> { + public void handle(TaskSchedulerEvent event) { + Stage stage = query.getStage(event.getExecutionBlockId()); + stage.getTaskScheduler().handle(event); + } + } + + private class LocalTaskEventHandler implements EventHandler<LocalTaskEvent> { + @Override + public void handle(LocalTaskEvent event) { + TajoContainerProxy proxy = (TajoContainerProxy) resourceAllocator.getContainers().get(event.getContainerId()); + if (proxy != null) { + proxy.killTaskAttempt(event.getTaskAttemptId()); + } + } + } + + private class QueryFinishEventHandler implements EventHandler<QueryMasterQueryCompletedEvent> { + @Override + public void handle(QueryMasterQueryCompletedEvent event) { + QueryId queryId = event.getQueryId(); + LOG.info("Query completion notified from " + queryId); + + while (!isTerminatedState(query.getSynchronizedState())) { + try { + synchronized (this) { + wait(10); + } + } catch (InterruptedException e) { + LOG.error(e); + } + } + LOG.info("Query final state: " + query.getSynchronizedState()); + + queryMasterContext.getEventHandler().handle(new QueryStopEvent(queryId)); + } + } + + private static boolean isTerminatedState(QueryState state) { + return + state == QueryState.QUERY_SUCCEEDED || + state == QueryState.QUERY_FAILED || + state == QueryState.QUERY_KILLED || + state == QueryState.QUERY_ERROR; + } + + public synchronized void startQuery() { + StorageManager sm = null; + LogicalPlan plan = null; + try { + if (query != null) { + LOG.warn("Query already started"); + return; + } + CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog(); + LogicalPlanner planner = new LogicalPlanner(catalog); + LogicalOptimizer optimizer = new LogicalOptimizer(systemConf); + Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class); + jsonExpr = null; // remove the possible OOM + plan = planner.createPlan(queryContext, expr); + + StoreType storeType = PlannerUtil.getStoreType(plan); + if (storeType != null) { + sm = StorageManager.getStorageManager(systemConf, storeType); + StorageProperty storageProperty = sm.getStorageProperty(); + if (storageProperty.isSortedInsert()) { + String tableName = PlannerUtil.getStoreTableName(plan); + LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild()); + if (tableDesc == null) { + throw new VerifyException("Can't get table meta data from catalog: " + tableName); + } + List<LogicalPlanRewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules( + getQueryTaskContext().getQueryContext(), tableDesc); + if (storageSpecifiedRewriteRules != null) { + for (LogicalPlanRewriteRule eachRule: storageSpecifiedRewriteRules) { + optimizer.addRuleAfterToJoinOpt(eachRule); + } + } + } + } + + optimizer.optimize(queryContext, plan); + + for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) { + LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN); + if (scanNodes != null) { + for (LogicalNode eachScanNode : scanNodes) { + ScanNode scanNode = (ScanNode) eachScanNode; + tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); + } + } + + scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.PARTITIONS_SCAN); + if (scanNodes != null) { + for (LogicalNode eachScanNode : scanNodes) { + ScanNode scanNode = (ScanNode) eachScanNode; + tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); + } + } + } + MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan); + queryMasterContext.getGlobalPlanner().build(masterPlan); + + query = new Query(queryTaskContext, queryId, querySubmitTime, + "", queryTaskContext.getEventHandler(), masterPlan); + + dispatcher.register(QueryEventType.class, query); + queryTaskContext.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.START)); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + initError = t; + + if (plan != null && sm != null) { + LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + try { + sm.rollbackOutputCommit(rootNode.getChild()); + } catch (IOException e) { + LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e); + } + } + } + } + + private void initStagingDir() throws IOException { + Path stagingDir = null; + FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf); + + try { + + stagingDir = initStagingDir(systemConf, queryId.toString(), queryContext); + + // Create a subdirectories + LOG.info("The staging dir '" + stagingDir + "' is created."); + queryContext.setStagingDir(stagingDir); + } catch (IOException ioe) { + if (stagingDir != null && defaultFS.exists(stagingDir)) { + try { + defaultFS.delete(stagingDir, true); + LOG.info("The staging directory '" + stagingDir + "' is deleted"); + } catch (Exception e) { + LOG.warn(e.getMessage()); + } + } + + throw ioe; + } + } + + /** + * It initializes the final output and staging directory and sets + * them to variables. + */ + public static Path initStagingDir(TajoConf conf, String queryId, QueryContext context) throws IOException { + + String realUser; + String currentUser; + UserGroupInformation ugi; + ugi = UserGroupInformation.getLoginUser(); + realUser = ugi.getShortUserName(); + currentUser = UserGroupInformation.getCurrentUser().getShortUserName(); + + FileSystem fs; + Path stagingDir; + + //////////////////////////////////////////// + // Create Output Directory + //////////////////////////////////////////// + + String outputPath = context.get(QueryVars.OUTPUT_TABLE_PATH, ""); + if (context.isCreateTable() || context.isInsert()) { + if (outputPath == null || outputPath.isEmpty()) { + // hbase + stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId); + } else { + stagingDir = StorageUtil.concatPath(context.getOutputPath(), TMP_STAGING_DIR_PREFIX, queryId); + } + } else { + stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId); + } + + // initializ + fs = stagingDir.getFileSystem(conf); + + if (fs.exists(stagingDir)) { + throw new IOException("The staging directory '" + stagingDir + "' already exists"); + } + fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION)); + FileStatus fsStatus = fs.getFileStatus(stagingDir); + String owner = fsStatus.getOwner(); + + if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) { + throw new IOException("The ownership on the user's query " + + "directory " + stagingDir + " is not as expected. " + + "It is owned by " + owner + ". The directory must " + + "be owned by the submitter " + currentUser + " or " + + "by " + realUser); + } + + if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) { + LOG.info("Permissions on staging directory " + stagingDir + " are " + + "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " + + "to correct value " + STAGING_DIR_PERMISSION); + fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION)); + } + + Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + fs.mkdirs(stagingResultDir); + + return stagingDir; + } + + public Query getQuery() { + return query; + } + + protected void expireQuerySession() { + if(!isTerminatedState(query.getState()) && !(query.getState() == QueryState.QUERY_KILL_WAIT)){ + query.handle(new QueryEvent(queryId, QueryEventType.KILL)); + } + } + + public QueryMasterTaskContext getQueryTaskContext() { + return queryTaskContext; + } + + public EventHandler getEventHandler() { + return queryTaskContext.getEventHandler(); + } + + public void touchSessionTime() { + this.lastClientHeartbeat.set(System.currentTimeMillis()); + } + + public long getLastClientHeartbeat() { + return this.lastClientHeartbeat.get(); + } + + public QueryId getQueryId() { + return queryId; + } + + public boolean isInitError() { + return initError != null; + } + + public QueryState getState() { + if(query == null) { + if (isInitError()) { + return QueryState.QUERY_ERROR; + } else { + return QueryState.QUERY_NOT_ASSIGNED; + } + } else { + return query.getState(); + } + } + + public Throwable getInitError() { + return initError; + } + + public String getErrorMessage() { + if (isInitError()) { + return StringUtils.stringifyException(initError); + } else { + return null; + } + } + + public long getQuerySubmitTime() { + return this.querySubmitTime; + } + + public class QueryMasterTaskContext { + EventHandler eventHandler; + public QueryMaster.QueryMasterContext getQueryMasterContext() { + return queryMasterContext; + } + + public Session getSession() { + return session; + } + + public QueryContext getQueryContext() { + return queryContext; + } + + public TajoConf getConf() { + return systemConf; + } + + public Clock getClock() { + return queryMasterContext.getClock(); + } + + public Query getQuery() { + return query; + } + + public QueryId getQueryId() { + return queryId; + } + + public Path getStagingDir() { + return queryContext.getStagingDir(); + } + + public synchronized EventHandler getEventHandler() { + if(eventHandler == null) { + eventHandler = dispatcher.getEventHandler(); + } + return eventHandler; + } + + public AsyncDispatcher getDispatcher() { + return dispatcher; + } + + public Stage getStage(ExecutionBlockId id) { + return query.getStage(id); + } + + public Map<String, TableDesc> getTableDescMap() { + return tableDescMap; + } + + public float getProgress() { + if(query == null) { + return 0.0f; + } + return query.getProgress(); + } + + public AbstractResourceAllocator getResourceAllocator() { + return resourceAllocator; + } + + public TajoMetrics getQueryMetrics() { + return queryMetrics; + } + } +} \ No newline at end of file
