http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java deleted file mode 100644 index 559fc14..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java +++ /dev/null @@ -1,235 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.querymaster; - - -import com.google.gson.annotations.Expose; -import org.apache.tajo.QueryId; -import org.apache.tajo.TajoProtos; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.engine.json.CoreGsonHelper; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.ipc.ClientProtos.QueryInfoProto; -import org.apache.tajo.json.GsonObject; -import org.apache.tajo.util.TajoIdUtils; -import org.apache.tajo.util.history.History; - -public class QueryInfo implements GsonObject, History { - private QueryId queryId; - @Expose - private QueryContext context; - @Expose - private String sql; - @Expose - private volatile TajoProtos.QueryState queryState; - @Expose - private volatile float progress; - @Expose - private volatile long startTime; - @Expose - private volatile long finishTime; - @Expose - private String lastMessage; - @Expose - private String hostNameOfQM; - @Expose - private int queryMasterPort; - @Expose - private int queryMasterClientPort; - @Expose - private int queryMasterInfoPort; - @Expose - private String queryIdStr; - @Expose - private volatile TableDesc resultDesc; - - private String jsonExpr; - - public QueryInfo(QueryId queryId) { - this(queryId, null, null, null); - } - - public QueryInfo(QueryId queryId, QueryContext queryContext, String sql, String jsonExpr) { - this.queryId = queryId; - this.queryIdStr = queryId.toString(); - this.context = queryContext; - this.sql = sql; - this.jsonExpr = jsonExpr; - - this.queryState = TajoProtos.QueryState.QUERY_MASTER_INIT; - } - - public QueryId getQueryId() { - return queryId; - } - - public QueryContext getQueryContext() { - return context; - } - - public String getSql() { - return sql; - } - - public String getQueryMasterHost() { - return hostNameOfQM; - } - - public void setQueryMaster(String hostName) { - this.hostNameOfQM = hostName; - } - - public int getQueryMasterInfoPort() { - return queryMasterInfoPort; - } - - public void setQueryMasterInfoPort(int queryMasterInfoPort) { - this.queryMasterInfoPort = queryMasterInfoPort; - } - - public void setQueryMasterPort(int port) { - this.queryMasterPort = port; - } - - public int getQueryMasterPort() { - return queryMasterPort; - } - - public void setQueryMasterclientPort(int port) { - queryMasterClientPort = port; - } - - public int getQueryMasterClientPort() { - return queryMasterClientPort; - } - - public TajoProtos.QueryState getQueryState() { - return queryState; - } - - public void setQueryState(TajoProtos.QueryState queryState) { - this.queryState = queryState; - } - - public long getStartTime() { - return startTime; - } - - public void setStartTime(long startTime) { - this.startTime = startTime; - } - - public long getFinishTime() { - return finishTime; - } - - public void setFinishTime(long finishTime) { - this.finishTime = finishTime; - } - - public String getLastMessage() { - return lastMessage; - } - - public void setLastMessage(String lastMessage) { - this.lastMessage = lastMessage; - } - - public float getProgress() { - return progress; - } - - public void setProgress(float progress) { - this.progress = progress; - } - - public void setResultDesc(TableDesc result) { - this.resultDesc = result; - } - - public boolean hasResultdesc() { - return resultDesc != null; - } - - public TableDesc getResultDesc() { - return resultDesc; - } - - @Override - public String toString() { - return queryId.toString() + ",state=" + queryState +",progress=" + progress + ", queryMaster=" - + getQueryMasterHost(); - } - - public String getJsonExpr() { - return jsonExpr; - } - - @Override - public String toJson() { - return CoreGsonHelper.toJson(this, QueryInfo.class); - } - - @Override - public HistoryType getHistoryType() { - return HistoryType.QUERY_SUMMARY; - } - - public static QueryInfo fromJson(String json) { - QueryInfo queryInfo = CoreGsonHelper.fromJson(json, QueryInfo.class); - queryInfo.queryId = TajoIdUtils.parseQueryId(queryInfo.queryIdStr); - return queryInfo; - } - - public String getQueryIdStr() { - return queryIdStr; - } - - public QueryInfoProto getProto() { - QueryInfoProto.Builder builder = QueryInfoProto.newBuilder(); - - builder.setQueryId(queryId.toString()) - .setQueryState(queryState) - .setContextVars(context.getProto()) - .setProgress(progress) - .setStartTime(startTime) - .setFinishTime(finishTime) - .setQueryMasterPort(queryMasterPort) - .setQueryMasterClientPort(queryMasterClientPort) - .setQueryMasterInfoPort(queryMasterInfoPort); - - if (resultDesc != null) { - builder.setResultDesc(resultDesc.getProto()); - } - - if (sql != null) { - builder.setSql(sql); - } - - if (lastMessage != null) { - builder.setLastMessage(lastMessage); - } - - if (hostNameOfQM != null) { - builder.setHostNameOfQM(hostNameOfQM); - } - - return builder.build(); - } -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java deleted file mode 100644 index ce30ec7..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.querymaster; - -import org.apache.hadoop.yarn.event.AbstractEvent; - -public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type> { - private QueryInfo queryInfo; - - public QueryJobEvent(Type type, QueryInfo queryInfo) { - super(type); - - this.queryInfo = queryInfo; - } - - public QueryInfo getQueryInfo() { - return this.queryInfo; - } - - public enum Type { - QUERY_JOB_START, - QUERY_JOB_HEARTBEAT, - QUERY_JOB_FINISH, - QUERY_JOB_STOP, - QUERY_MASTER_START, - QUERY_MASTER_STOP, - QUERY_JOB_KILL - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java deleted file mode 100644 index 13f6456..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java +++ /dev/null @@ -1,310 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.querymaster; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.tajo.QueryId; -import org.apache.tajo.QueryIdFactory; -import org.apache.tajo.TajoProtos; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.ipc.TajoMasterProtocol; -import org.apache.tajo.master.TajoMaster; -import org.apache.tajo.master.cluster.WorkerConnectionInfo; -import org.apache.tajo.master.session.Session; -import org.apache.tajo.plan.logical.LogicalRootNode; -import org.apache.tajo.scheduler.SimpleFifoScheduler; - -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -public class QueryJobManager extends CompositeService { - private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName()); - - // TajoMaster Context - private final TajoMaster.MasterContext masterContext; - - private AsyncDispatcher dispatcher; - - private SimpleFifoScheduler scheduler; - - private final Map<QueryId, QueryInProgress> submittedQueries = Maps.newConcurrentMap(); - - private final Map<QueryId, QueryInProgress> runningQueries = Maps.newConcurrentMap(); - - private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE); - private AtomicLong maxExecutionTime = new AtomicLong(); - private AtomicLong avgExecutionTime = new AtomicLong(); - private AtomicLong executedQuerySize = new AtomicLong(); - - public QueryJobManager(final TajoMaster.MasterContext masterContext) { - super(QueryJobManager.class.getName()); - this.masterContext = masterContext; - } - - @Override - public void init(Configuration conf) { - try { - this.dispatcher = new AsyncDispatcher(); - addService(this.dispatcher); - - this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler()); - - this.scheduler = new SimpleFifoScheduler(this); - } catch (Exception e) { - catchException(null, e); - } - - super.init(conf); - } - - @Override - public void stop() { - synchronized(runningQueries) { - for(QueryInProgress eachQueryInProgress: runningQueries.values()) { - eachQueryInProgress.stop(); - } - } - this.scheduler.stop(); - super.stop(); - } - - @Override - public void start() { - this.scheduler.start(); - super.start(); - } - - public EventHandler getEventHandler() { - return dispatcher.getEventHandler(); - } - - public Collection<QueryInProgress> getSubmittedQueries() { - synchronized (submittedQueries){ - return Collections.unmodifiableCollection(submittedQueries.values()); - } - } - - public Collection<QueryInProgress> getRunningQueries() { - synchronized (runningQueries){ - return Collections.unmodifiableCollection(runningQueries.values()); - } - } - - public synchronized Collection<QueryInfo> getFinishedQueries() { - try { - return this.masterContext.getHistoryReader().getQueries(null); - } catch (Throwable e) { - LOG.error(e); - return Lists.newArrayList(); - } - } - - - public synchronized QueryInfo getFinishedQuery(QueryId queryId) { - try { - return this.masterContext.getHistoryReader().getQueryInfo(queryId.toString()); - } catch (Throwable e) { - LOG.error(e); - return null; - } - } - - public QueryInfo createNewQueryJob(Session session, QueryContext queryContext, String sql, - String jsonExpr, LogicalRootNode plan) - throws Exception { - QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId()); - QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql, - jsonExpr, plan); - - synchronized (submittedQueries) { - queryInProgress.getQueryInfo().setQueryMaster(""); - submittedQueries.put(queryInProgress.getQueryId(), queryInProgress); - } - - scheduler.addQuery(queryInProgress); - return queryInProgress.getQueryInfo(); - } - - public QueryInfo startQueryJob(QueryId queryId) throws Exception { - - QueryInProgress queryInProgress; - - synchronized (submittedQueries) { - queryInProgress = submittedQueries.remove(queryId); - } - - synchronized (runningQueries) { - runningQueries.put(queryInProgress.getQueryId(), queryInProgress); - } - - addService(queryInProgress); - queryInProgress.init(getConfig()); - queryInProgress.start(); - - if (!queryInProgress.startQueryMaster()) { - stopQuery(queryId); - } - - return queryInProgress.getQueryInfo(); - } - - public TajoMaster.MasterContext getMasterContext() { - return masterContext; - } - - class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> { - @Override - public void handle(QueryJobEvent event) { - QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId()); - if(queryInProgress == null) { - LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]"); - return; - } - - if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) { - stopQuery(event.getQueryInfo().getQueryId()); - } else if (queryInProgress.isStarted()) { - queryInProgress.getEventHandler().handle(event); - } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) { - scheduler.removeQuery(queryInProgress.getQueryId()); - queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED); - - stopQuery(queryInProgress.getQueryId()); - } - } - } - - public QueryInProgress getQueryInProgress(QueryId queryId) { - QueryInProgress queryInProgress; - synchronized (submittedQueries) { - queryInProgress = submittedQueries.get(queryId); - } - - if (queryInProgress == null) { - synchronized (runningQueries) { - queryInProgress = runningQueries.get(queryId); - } - } - return queryInProgress; - } - - public void stopQuery(QueryId queryId) { - LOG.info("Stop QueryInProgress:" + queryId); - QueryInProgress queryInProgress = getQueryInProgress(queryId); - if(queryInProgress != null) { - queryInProgress.stop(); - synchronized(submittedQueries) { - submittedQueries.remove(queryId); - } - - synchronized(runningQueries) { - runningQueries.remove(queryId); - } - - QueryInfo queryInfo = queryInProgress.getQueryInfo(); - long executionTime = queryInfo.getFinishTime() - queryInfo.getStartTime(); - if (executionTime < minExecutionTime.get()) { - minExecutionTime.set(executionTime); - } - - if (executionTime > maxExecutionTime.get()) { - maxExecutionTime.set(executionTime); - } - - long totalExecutionTime = executedQuerySize.get() * avgExecutionTime.get(); - if (totalExecutionTime > 0) { - avgExecutionTime.set((totalExecutionTime + executionTime) / (executedQuerySize.get() + 1)); - } else { - avgExecutionTime.set(executionTime); - } - executedQuerySize.incrementAndGet(); - removeService(queryInProgress); - } else { - LOG.warn("No QueryInProgress while query stopping: " + queryId); - } - } - - public long getMinExecutionTime() { - if (getExecutedQuerySize() == 0) return 0; - return minExecutionTime.get(); - } - - public long getMaxExecutionTime() { - return maxExecutionTime.get(); - } - - public long getAvgExecutionTime() { - return avgExecutionTime.get(); - } - - public long getExecutedQuerySize() { - return executedQuerySize.get(); - } - - private void catchException(QueryId queryId, Exception e) { - LOG.error(e.getMessage(), e); - QueryInProgress queryInProgress = runningQueries.get(queryId); - queryInProgress.catchException(e); - } - - public synchronized TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat( - TajoMasterProtocol.TajoHeartbeat queryHeartbeat) { - QueryInProgress queryInProgress = getQueryInProgress(new QueryId(queryHeartbeat.getQueryId())); - if(queryInProgress == null) { - return null; - } - - QueryInfo queryInfo = makeQueryInfoFromHeartbeat(queryHeartbeat); - getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_HEARTBEAT, queryInfo)); - - return null; - } - - private QueryInfo makeQueryInfoFromHeartbeat(TajoMasterProtocol.TajoHeartbeat queryHeartbeat) { - QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId())); - WorkerConnectionInfo connectionInfo = new WorkerConnectionInfo(queryHeartbeat.getConnectionInfo()); - - queryInfo.setQueryMaster(connectionInfo.getHost()); - queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort()); - queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort()); - queryInfo.setLastMessage(queryHeartbeat.getStatusMessage()); - queryInfo.setQueryState(queryHeartbeat.getState()); - queryInfo.setProgress(queryHeartbeat.getQueryProgress()); - - if (queryHeartbeat.hasQueryFinishTime()) { - queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime()); - } - - if (queryHeartbeat.hasResultDesc()) { - queryInfo.setResultDesc(new TableDesc(queryHeartbeat.getResultDesc())); - } - - return queryInfo; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java deleted file mode 100644 index 641de78..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java +++ /dev/null @@ -1,631 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.querymaster; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.Event; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.SystemClock; -import org.apache.tajo.*; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.engine.planner.global.GlobalPlanner; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.ha.HAServiceUtil; -import org.apache.tajo.ipc.TajoMasterProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.event.QueryStartEvent; -import org.apache.tajo.master.event.QueryStopEvent; -import org.apache.tajo.rpc.CallFuture; -import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.RpcConnectionPool; -import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; -import org.apache.tajo.util.NetUtils; -import org.apache.tajo.util.history.QueryHistory; -import org.apache.tajo.worker.TajoWorker; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeat; -import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse; - -// TODO - when exception, send error status to QueryJobManager -public class QueryMaster extends CompositeService implements EventHandler { - private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName()); - - private int querySessionTimeout; - - private Clock clock; - - private AsyncDispatcher dispatcher; - - private GlobalPlanner globalPlanner; - - private TajoConf systemConf; - - private Map<QueryId, QueryMasterTask> queryMasterTasks = Maps.newConcurrentMap(); - - private Map<QueryId, QueryMasterTask> finishedQueryMasterTasks = Maps.newConcurrentMap(); - - private ClientSessionTimeoutCheckThread clientSessionTimeoutCheckThread; - - private AtomicBoolean queryMasterStop = new AtomicBoolean(false); - - private QueryMasterContext queryMasterContext; - - private QueryContext queryContext; - - private QueryHeartbeatThread queryHeartbeatThread; - - private FinishedQueryMasterTaskCleanThread finishedQueryMasterTaskCleanThread; - - private TajoWorker.WorkerContext workerContext; - - private RpcConnectionPool connPool; - - private ExecutorService eventExecutor; - - public QueryMaster(TajoWorker.WorkerContext workerContext) { - super(QueryMaster.class.getName()); - this.workerContext = workerContext; - } - - public void init(Configuration conf) { - LOG.info("QueryMaster init"); - try { - this.systemConf = (TajoConf)conf; - this.connPool = RpcConnectionPool.getPool(systemConf); - - querySessionTimeout = systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT); - queryMasterContext = new QueryMasterContext(systemConf); - - clock = new SystemClock(); - - this.dispatcher = new AsyncDispatcher(); - addIfService(dispatcher); - - globalPlanner = new GlobalPlanner(systemConf, workerContext); - - dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler()); - dispatcher.register(QueryStopEvent.EventType.class, new QueryStopEventHandler()); - - } catch (Throwable t) { - LOG.error(t.getMessage(), t); - throw new RuntimeException(t); - } - super.init(conf); - } - - @Override - public void start() { - LOG.info("QueryMaster start"); - - queryHeartbeatThread = new QueryHeartbeatThread(); - queryHeartbeatThread.start(); - - clientSessionTimeoutCheckThread = new ClientSessionTimeoutCheckThread(); - clientSessionTimeoutCheckThread.start(); - - finishedQueryMasterTaskCleanThread = new FinishedQueryMasterTaskCleanThread(); - finishedQueryMasterTaskCleanThread.start(); - - eventExecutor = Executors.newSingleThreadExecutor(); - super.start(); - } - - @Override - public void stop() { - if(queryMasterStop.getAndSet(true)){ - return; - } - - if(queryHeartbeatThread != null) { - queryHeartbeatThread.interrupt(); - } - - if(clientSessionTimeoutCheckThread != null) { - clientSessionTimeoutCheckThread.interrupt(); - } - - if(finishedQueryMasterTaskCleanThread != null) { - finishedQueryMasterTaskCleanThread.interrupt(); - } - - if(eventExecutor != null){ - eventExecutor.shutdown(); - } - - super.stop(); - - LOG.info("QueryMaster stop"); - if(queryMasterContext.getWorkerContext().isYarnContainerMode()) { - queryMasterContext.getWorkerContext().stopWorker(true); - } - } - - protected void cleanupExecutionBlock(List<TajoIdProtos.ExecutionBlockIdProto> executionBlockIds) { - StringBuilder cleanupMessage = new StringBuilder(); - String prefix = ""; - for (TajoIdProtos.ExecutionBlockIdProto eachEbId: executionBlockIds) { - cleanupMessage.append(prefix).append(new ExecutionBlockId(eachEbId).toString()); - prefix = ","; - } - LOG.info("cleanup executionBlocks: " + cleanupMessage); - NettyClientBase rpc = null; - List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker(); - TajoWorkerProtocol.ExecutionBlockListProto.Builder builder = TajoWorkerProtocol.ExecutionBlockListProto.newBuilder(); - builder.addAllExecutionBlockId(Lists.newArrayList(executionBlockIds)); - TajoWorkerProtocol.ExecutionBlockListProto executionBlockListProto = builder.build(); - - for (TajoMasterProtocol.WorkerResourceProto worker : workers) { - try { - TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo(); - rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()), - TajoWorkerProtocol.class, true); - TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub(); - - tajoWorkerProtocolService.cleanupExecutionBlocks(null, executionBlockListProto, NullCallback.get()); - } catch (Exception e) { - continue; - } finally { - connPool.releaseConnection(rpc); - } - } - } - - private void cleanup(QueryId queryId) { - LOG.info("cleanup query resources : " + queryId); - NettyClientBase rpc = null; - List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker(); - - for (TajoMasterProtocol.WorkerResourceProto worker : workers) { - try { - TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo(); - rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()), - TajoWorkerProtocol.class, true); - TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub(); - - tajoWorkerProtocolService.cleanup(null, queryId.getProto(), NullCallback.get()); - } catch (Exception e) { - LOG.error(e.getMessage()); - } finally { - connPool.releaseConnection(rpc); - } - } - } - - public List<TajoMasterProtocol.WorkerResourceProto> getAllWorker() { - - NettyClientBase rpc = null; - try { - // In TajoMaster HA mode, if backup master be active status, - // worker may fail to connect existing active master. Thus, - // if worker can't connect the master, worker should try to connect another master and - // update master address in worker context. - if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - try { - rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } catch (Exception e) { - queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr( - HAServiceUtil.getResourceTrackerAddress(systemConf)); - queryMasterContext.getWorkerContext().setTajoMasterAddress( - HAServiceUtil.getMasterUmbilicalAddress(systemConf)); - rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } - } else { - rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } - - TajoMasterProtocol.TajoMasterProtocolService masterService = rpc.getStub(); - - CallFuture<TajoMasterProtocol.WorkerResourcesRequest> callBack = - new CallFuture<TajoMasterProtocol.WorkerResourcesRequest>(); - masterService.getAllWorkerResource(callBack.getController(), - PrimitiveProtos.NullProto.getDefaultInstance(), callBack); - - TajoMasterProtocol.WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS); - return workerResourcesRequest.getWorkerResourcesList(); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } finally { - connPool.releaseConnection(rpc); - } - return new ArrayList<TajoMasterProtocol.WorkerResourceProto>(); - } - - public void reportQueryStatusToQueryMaster(QueryId queryId, TajoProtos.QueryState state) { - LOG.info("Send QueryMaster Ready to QueryJobManager:" + queryId); - NettyClientBase tmClient = null; - try { - // In TajoMaster HA mode, if backup master be active status, - // worker may fail to connect existing active master. Thus, - // if worker can't connect the master, worker should try to connect another master and - // update master address in worker context. - if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - try { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } catch (Exception e) { - queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr( - HAServiceUtil.getResourceTrackerAddress(systemConf)); - queryMasterContext.getWorkerContext().setTajoMasterAddress( - HAServiceUtil.getMasterUmbilicalAddress(systemConf)); - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } - } else { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } - - TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); - - TajoHeartbeat.Builder queryHeartbeatBuilder = TajoHeartbeat.newBuilder() - .setConnectionInfo(workerContext.getConnectionInfo().getProto()) - .setState(state) - .setQueryId(queryId.getProto()); - - CallFuture<TajoHeartbeatResponse> callBack = - new CallFuture<TajoHeartbeatResponse>(); - - masterClientService.heartbeat(callBack.getController(), queryHeartbeatBuilder.build(), callBack); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } finally { - connPool.releaseConnection(tmClient); - } - } - - @Override - public void handle(Event event) { - dispatcher.getEventHandler().handle(event); - } - - public Query getQuery(QueryId queryId) { - return queryMasterTasks.get(queryId).getQuery(); - } - - public QueryMasterTask getQueryMasterTask(QueryId queryId) { - return queryMasterTasks.get(queryId); - } - - public QueryMasterTask getQueryMasterTask(QueryId queryId, boolean includeFinished) { - QueryMasterTask queryMasterTask = queryMasterTasks.get(queryId); - if(queryMasterTask != null) { - return queryMasterTask; - } else { - if(includeFinished) { - return finishedQueryMasterTasks.get(queryId); - } else { - return null; - } - } - } - - public QueryMasterContext getContext() { - return this.queryMasterContext; - } - - public Collection<QueryMasterTask> getQueryMasterTasks() { - return queryMasterTasks.values(); - } - - public Collection<QueryMasterTask> getFinishedQueryMasterTasks() { - return finishedQueryMasterTasks.values(); - } - - public class QueryMasterContext { - private TajoConf conf; - - public QueryMasterContext(TajoConf conf) { - this.conf = conf; - } - - public TajoConf getConf() { - return conf; - } - - public ExecutorService getEventExecutor(){ - return eventExecutor; - } - - public AsyncDispatcher getDispatcher() { - return dispatcher; - } - - public Clock getClock() { - return clock; - } - - public QueryMaster getQueryMaster() { - return QueryMaster.this; - } - - public GlobalPlanner getGlobalPlanner() { - return globalPlanner; - } - - public TajoWorker.WorkerContext getWorkerContext() { - return workerContext; - } - - public EventHandler getEventHandler() { - return dispatcher.getEventHandler(); - } - - public void stopQuery(QueryId queryId) { - QueryMasterTask queryMasterTask = queryMasterTasks.remove(queryId); - if(queryMasterTask == null) { - LOG.warn("No query info:" + queryId); - return; - } - - finishedQueryMasterTasks.put(queryId, queryMasterTask); - - TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(queryMasterTask); - CallFuture<TajoHeartbeatResponse> future = new CallFuture<TajoHeartbeatResponse>(); - - NettyClientBase tmClient = null; - try { - // In TajoMaster HA mode, if backup master be active status, - // worker may fail to connect existing active master. Thus, - // if worker can't connect the master, worker should try to connect another master and - // update master address in worker context. - if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - try { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } catch (Exception e) { - queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf)); - queryMasterContext.getWorkerContext().setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf)); - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } - } else { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } - - TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); - masterClientService.heartbeat(future.getController(), queryHeartbeat, future); - } catch (Exception e) { - //this function will be closed in new thread. - //When tajo do stop cluster, tajo master maybe throw closed connection exception - - LOG.error(e.getMessage(), e); - } finally { - connPool.releaseConnection(tmClient); - } - - try { - queryMasterTask.stop(); - if (!queryContext.getBool(SessionVars.DEBUG_ENABLED)) { - cleanup(queryId); - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - Query query = queryMasterTask.getQuery(); - if (query != null) { - QueryHistory queryHisory = query.getQueryHistory(); - if (queryHisory != null) { - query.context.getQueryMasterContext().getWorkerContext(). - getTaskHistoryWriter().appendHistory(queryHisory); - } - } - if(workerContext.isYarnContainerMode()) { - stop(); - } - } - } - - private TajoHeartbeat buildTajoHeartBeat(QueryMasterTask queryMasterTask) { - TajoHeartbeat.Builder builder = TajoHeartbeat.newBuilder(); - - builder.setConnectionInfo(workerContext.getConnectionInfo().getProto()); - builder.setQueryId(queryMasterTask.getQueryId().getProto()); - builder.setState(queryMasterTask.getState()); - if (queryMasterTask.getQuery() != null) { - if (queryMasterTask.getQuery().getResultDesc() != null) { - builder.setResultDesc(queryMasterTask.getQuery().getResultDesc().getProto()); - } - builder.setQueryProgress(queryMasterTask.getQuery().getProgress()); - builder.setQueryFinishTime(queryMasterTask.getQuery().getFinishTime()); - } - return builder.build(); - } - - private class QueryStartEventHandler implements EventHandler<QueryStartEvent> { - @Override - public void handle(QueryStartEvent event) { - LOG.info("Start QueryStartEventHandler:" + event.getQueryId()); - QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext, - event.getQueryId(), event.getSession(), event.getQueryContext(), event.getJsonExpr(), event.getLogicalPlanJson()); - - synchronized(queryMasterTasks) { - queryMasterTasks.put(event.getQueryId(), queryMasterTask); - } - - queryMasterTask.init(systemConf); - if (!queryMasterTask.isInitError()) { - queryMasterTask.start(); - } - - queryContext = event.getQueryContext(); - - if (queryMasterTask.isInitError()) { - queryMasterContext.stopQuery(queryMasterTask.getQueryId()); - return; - } - } - } - - private class QueryStopEventHandler implements EventHandler<QueryStopEvent> { - @Override - public void handle(QueryStopEvent event) { - queryMasterContext.stopQuery(event.getQueryId()); - } - } - - class QueryHeartbeatThread extends Thread { - public QueryHeartbeatThread() { - super("QueryHeartbeatThread"); - } - - @Override - public void run() { - LOG.info("Start QueryMaster heartbeat thread"); - while(!queryMasterStop.get()) { - List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>(); - synchronized(queryMasterTasks) { - tempTasks.addAll(queryMasterTasks.values()); - } - synchronized(queryMasterTasks) { - for(QueryMasterTask eachTask: tempTasks) { - NettyClientBase tmClient; - try { - // In TajoMaster HA mode, if backup master be active status, - // worker may fail to connect existing active master. Thus, - // if worker can't connect the master, worker should try to connect another master and - // update master address in worker context. - if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - try { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } catch (Exception e) { - queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr( - HAServiceUtil.getResourceTrackerAddress(systemConf)); - queryMasterContext.getWorkerContext().setTajoMasterAddress( - HAServiceUtil.getMasterUmbilicalAddress(systemConf)); - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } - } else { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } - - TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); - - CallFuture<TajoHeartbeatResponse> callBack = - new CallFuture<TajoHeartbeatResponse>(); - - TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask); - masterClientService.heartbeat(callBack.getController(), queryHeartbeat, callBack); - } catch (Throwable t) { - t.printStackTrace(); - } - } - } - synchronized(queryMasterStop) { - try { - queryMasterStop.wait(2000); - } catch (InterruptedException e) { - break; - } - } - } - LOG.info("QueryMaster heartbeat thread stopped"); - } - } - - class ClientSessionTimeoutCheckThread extends Thread { - public void run() { - LOG.info("ClientSessionTimeoutCheckThread started"); - while(!queryMasterStop.get()) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - break; - } - List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>(); - synchronized(queryMasterTasks) { - tempTasks.addAll(queryMasterTasks.values()); - } - - for(QueryMasterTask eachTask: tempTasks) { - if(!eachTask.isStopped()) { - try { - long lastHeartbeat = eachTask.getLastClientHeartbeat(); - long time = System.currentTimeMillis() - lastHeartbeat; - if(lastHeartbeat > 0 && time > querySessionTimeout * 1000) { - LOG.warn("Query " + eachTask.getQueryId() + " stopped cause query session timeout: " + time + " ms"); - eachTask.expireQuerySession(); - } - } catch (Exception e) { - LOG.error(eachTask.getQueryId() + ":" + e.getMessage(), e); - } - } - } - } - } - } - - class FinishedQueryMasterTaskCleanThread extends Thread { - public void run() { - int expireIntervalTime = systemConf.getIntVar(TajoConf.ConfVars.WORKER_HISTORY_EXPIRE_PERIOD); - LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval minutes = " + expireIntervalTime); - while(!queryMasterStop.get()) { - try { - Thread.sleep(60 * 1000 * 60); // hourly - } catch (InterruptedException e) { - break; - } - try { - long expireTime = System.currentTimeMillis() - expireIntervalTime * 60 * 1000; - cleanExpiredFinishedQueryMasterTask(expireTime); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - } - } - - private void cleanExpiredFinishedQueryMasterTask(long expireTime) { - synchronized(finishedQueryMasterTasks) { - List<QueryId> expiredQueryIds = new ArrayList<QueryId>(); - for(Map.Entry<QueryId, QueryMasterTask> entry: finishedQueryMasterTasks.entrySet()) { - if(entry.getValue().getStartTime() < expireTime) { - expiredQueryIds.add(entry.getKey()); - } - } - - for(QueryId eachId: expiredQueryIds) { - finishedQueryMasterTasks.remove(eachId); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java deleted file mode 100644 index 9f7d3f8..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java +++ /dev/null @@ -1,263 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.querymaster; - -import com.google.common.base.Preconditions; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.CompositeService; -import org.apache.tajo.*; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.ipc.QueryMasterProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.DefaultTaskScheduler; -import org.apache.tajo.master.container.TajoContainerId; -import org.apache.tajo.master.event.*; -import org.apache.tajo.master.session.Session; -import org.apache.tajo.rpc.AsyncRpcServer; -import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; -import org.apache.tajo.util.NetUtils; -import org.apache.tajo.worker.TajoWorker; - -import java.net.InetSocketAddress; - -public class QueryMasterManagerService extends CompositeService - implements QueryMasterProtocol.QueryMasterProtocolService.Interface { - private static final Log LOG = LogFactory.getLog(QueryMasterManagerService.class.getName()); - - private AsyncRpcServer rpcServer; - private InetSocketAddress bindAddr; - private String addr; - private int port; - - private QueryMaster queryMaster; - - private TajoWorker.WorkerContext workerContext; - - public QueryMasterManagerService(TajoWorker.WorkerContext workerContext, int port) { - super(QueryMasterManagerService.class.getName()); - this.workerContext = workerContext; - this.port = port; - } - - public QueryMaster getQueryMaster() { - return queryMaster; - } - - @Override - public void init(Configuration conf) { - Preconditions.checkArgument(conf instanceof TajoConf); - TajoConf tajoConf = (TajoConf) conf; - try { - // Setup RPC server - InetSocketAddress initIsa = - new InetSocketAddress("0.0.0.0", port); - if (initIsa.getAddress() == null) { - throw new IllegalArgumentException("Failed resolve of " + initIsa); - } - - int workerNum = tajoConf.getIntVar(TajoConf.ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM); - this.rpcServer = new AsyncRpcServer(QueryMasterProtocol.class, this, initIsa, workerNum); - this.rpcServer.start(); - - this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress()); - this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort(); - - this.port = bindAddr.getPort(); - - queryMaster = new QueryMaster(workerContext); - addService(queryMaster); - - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - // Get the master address - LOG.info("QueryMasterManagerService is bind to " + addr); - ((TajoConf)conf).setVar(TajoConf.ConfVars.WORKER_QM_RPC_ADDRESS, addr); - - super.init(conf); - } - - @Override - public void start() { - super.start(); - } - - @Override - public void stop() { - if(rpcServer != null) { - rpcServer.shutdown(); - } - LOG.info("QueryMasterManagerService stopped"); - super.stop(); - } - - public InetSocketAddress getBindAddr() { - return bindAddr; - } - - public String getHostAndPort() { - return bindAddr.getHostName() + ":" + bindAddr.getPort(); - } - - @Override - public void getTask(RpcController controller, TajoWorkerProtocol.GetTaskRequestProto request, - RpcCallback<TajoWorkerProtocol.TaskRequestProto> done) { - try { - ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId()); - QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId()); - - if(queryMasterTask == null || queryMasterTask.isStopped()) { - done.run(DefaultTaskScheduler.stopTaskRunnerReq); - } else { - TajoContainerId cid = - queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId()); - LOG.debug("getTask:" + cid + ", ebId:" + ebId); - queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(request.getWorkerId(), cid, ebId, done)); - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - } - - @Override - public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request, - RpcCallback<PrimitiveProtos.BoolProto> done) { - try { - QueryId queryId = new QueryId(request.getId().getTaskId().getExecutionBlockId().getQueryId()); - TaskAttemptId attemptId = new TaskAttemptId(request.getId()); - QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId); - if (queryMasterTask == null) { - queryMasterTask = queryMaster.getQueryMasterTask(queryId, true); - } - Stage sq = queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId()); - Task task = sq.getTask(attemptId.getTaskId()); - TaskAttempt attempt = task.getAttempt(attemptId.getId()); - - if(LOG.isDebugEnabled()){ - LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name())); - } - - if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) { - LOG.warn(attemptId + " Killed"); - attempt.handle( - new TaskAttemptEvent(new TaskAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED)); - } else { - queryMasterTask.getEventHandler().handle( - new TaskAttemptStatusUpdateEvent(new TaskAttemptId(request.getId()), request)); - } - done.run(TajoWorker.TRUE_PROTO); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - done.run(TajoWorker.FALSE_PROTO); - } - } - - @Override - public void ping(RpcController controller, - TajoIdProtos.ExecutionBlockIdProto requestProto, - RpcCallback<PrimitiveProtos.BoolProto> done) { - done.run(TajoWorker.TRUE_PROTO); - } - - @Override - public void fatalError(RpcController controller, TajoWorkerProtocol.TaskFatalErrorReport report, - RpcCallback<PrimitiveProtos.BoolProto> done) { - try { - QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask( - new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId())); - if (queryMasterTask != null) { - queryMasterTask.handleTaskFailed(report); - } else { - LOG.warn("No QueryMasterTask: " + new TaskAttemptId(report.getId())); - } - done.run(TajoWorker.TRUE_PROTO); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - done.run(TajoWorker.FALSE_PROTO); - } - } - - @Override - public void done(RpcController controller, TajoWorkerProtocol.TaskCompletionReport report, - RpcCallback<PrimitiveProtos.BoolProto> done) { - try { - QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask( - new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId())); - if (queryMasterTask != null) { - queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report)); - } - done.run(TajoWorker.TRUE_PROTO); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - done.run(TajoWorker.FALSE_PROTO); - } - } - - @Override - public void doneExecutionBlock( - RpcController controller, TajoWorkerProtocol.ExecutionBlockReport request, - RpcCallback<PrimitiveProtos.BoolProto> done) { - QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(new QueryId(request.getEbId().getQueryId())); - if (queryMasterTask != null) { - ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId()); - queryMasterTask.getQuery().getStage(ebId).receiveExecutionBlockReport(request); - } - done.run(TajoWorker.TRUE_PROTO); - } - - @Override - public void killQuery(RpcController controller, TajoIdProtos.QueryIdProto request, - RpcCallback<PrimitiveProtos.BoolProto> done) { - QueryId queryId = new QueryId(request); - QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId); - if (queryMasterTask != null) { - Query query = queryMasterTask.getQuery(); - if (query != null) { - query.handle(new QueryEvent(queryId, QueryEventType.KILL)); - } - } - } - - @Override - public void executeQuery(RpcController controller, - TajoWorkerProtocol.QueryExecutionRequestProto request, - RpcCallback<PrimitiveProtos.BoolProto> done) { - try { - workerContext.getWorkerSystemMetrics().counter("querymaster", "numQuery").inc(); - - QueryId queryId = new QueryId(request.getQueryId()); - LOG.info("Receive executeQuery request:" + queryId); - queryMaster.handle(new QueryStartEvent(queryId, - new Session(request.getSession()), - new QueryContext(workerContext.getQueryMaster().getContext().getConf(), - request.getQueryContext()), request.getExprInJson().getValue(), - request.getLogicalPlanJson().getValue())); - done.run(TajoWorker.TRUE_PROTO); - } catch (Exception e) { - workerContext.getWorkerSystemMetrics().counter("querymaster", "errorQuery").inc(); - LOG.error(e.getMessage(), e); - done.run(TajoWorker.FALSE_PROTO); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java deleted file mode 100644 index 56dd789..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java +++ /dev/null @@ -1,149 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.querymaster; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.util.RackResolver; -import org.apache.tajo.QueryId; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.util.TajoIdUtils; - -import java.io.PrintWriter; -import java.lang.management.ManagementFactory; -import java.lang.management.ThreadInfo; -import java.lang.management.ThreadMXBean; - -@Deprecated -public class QueryMasterRunner extends AbstractService { - private static final Log LOG = LogFactory.getLog(QueryMasterRunner.class); - private TajoConf systemConf; - private QueryMaster queryMaster; - private QueryId queryId; - private String queryMasterManagerAddress; - - public QueryMasterRunner(QueryId queryId, String queryMasterManagerAddress) { - super(QueryMasterRunner.class.getName()); - this.queryId = queryId; - this.queryMasterManagerAddress = queryMasterManagerAddress; - } - - private class ShutdownHook implements Runnable { - @Override - public void run() { - LOG.info("============================================"); - LOG.info("QueryMaster received SIGINT Signal"); - LOG.info("============================================"); - stop(); - } - } - - @Override - public void init(Configuration conf) { - this.systemConf = (TajoConf)conf; - RackResolver.init(systemConf); - Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook())); - super.init(conf); - } - - @Override - public void start() { - //create QueryMaster - QueryMaster query = new QueryMaster(null); - - query.init(systemConf); - query.start(); - } - - @Override - public void stop() { - } - - public static void main(String[] args) throws Exception { - LOG.info("QueryMasterRunner started"); - - final TajoConf conf = new TajoConf(); - conf.addResource(new Path(TajoConstants.SYSTEM_CONF_FILENAME)); - - UserGroupInformation.setConfiguration(conf); - - final QueryId queryId = TajoIdUtils.parseQueryId(args[0]); - final String queryMasterManagerAddr = args[1]; - - LOG.info("Received QueryId:" + queryId); - - QueryMasterRunner queryMasterRunner = new QueryMasterRunner(queryId, queryMasterManagerAddr); - queryMasterRunner.init(conf); - queryMasterRunner.start(); - - synchronized(queryId) { - queryId.wait(); - } - - System.exit(0); - } - - public static void printThreadInfo(PrintWriter stream, String title) { - ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); - final int STACK_DEPTH = 60; - boolean contention = threadBean.isThreadContentionMonitoringEnabled(); - long[] threadIds = threadBean.getAllThreadIds(); - stream.println("Process Thread Dump: " + title); - stream.println(threadIds.length + " active threads"); - for (long tid : threadIds) { - ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH); - if (info == null) { - stream.println(" Inactive"); - continue; - } - stream.println("Thread " + getTaskName(info.getThreadId(), info.getThreadName()) + ":"); - Thread.State state = info.getThreadState(); - stream.println(" State: " + state); - stream.println(" Blocked count: " + info.getBlockedCount()); - stream.println(" Waited count: " + info.getWaitedCount()); - if (contention) { - stream.println(" Blocked time: " + info.getBlockedTime()); - stream.println(" Waited time: " + info.getWaitedTime()); - } - if (state == Thread.State.WAITING) { - stream.println(" Waiting on " + info.getLockName()); - } else if (state == Thread.State.BLOCKED) { - stream.println(" Blocked on " + info.getLockName()); - stream.println(" Blocked by " + getTaskName(info.getLockOwnerId(), info.getLockOwnerName())); - } - stream.println(" Stack:"); - for (StackTraceElement frame : info.getStackTrace()) { - stream.println(" " + frame.toString()); - } - } - stream.flush(); - } - - private static String getTaskName(long id, String name) { - if (name == null) { - return Long.toString(id); - } - return id + " (" + name + ")"; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java deleted file mode 100644 index 9c789a5..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ /dev/null @@ -1,638 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.querymaster; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.util.Clock; -import org.apache.tajo.*; -import org.apache.tajo.algebra.Expr; -import org.apache.tajo.algebra.JsonHelper; -import org.apache.tajo.catalog.CatalogService; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.engine.planner.global.MasterPlan; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.exception.UnimplementedException; -import org.apache.tajo.ha.HAServiceUtil; -import org.apache.tajo.ipc.TajoMasterProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.TajoContainerProxy; -import org.apache.tajo.master.event.*; -import org.apache.tajo.master.rm.TajoWorkerResourceManager; -import org.apache.tajo.master.session.Session; -import org.apache.tajo.plan.LogicalOptimizer; -import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.LogicalPlanner; -import org.apache.tajo.plan.logical.LogicalNode; -import org.apache.tajo.plan.logical.LogicalRootNode; -import org.apache.tajo.plan.logical.NodeType; -import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; -import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.plan.verifier.VerifyException; -import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.RpcConnectionPool; -import org.apache.tajo.storage.StorageManager; -import org.apache.tajo.storage.StorageProperty; -import org.apache.tajo.storage.StorageUtil; -import org.apache.tajo.util.metrics.TajoMetrics; -import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter; -import org.apache.tajo.worker.AbstractResourceAllocator; -import org.apache.tajo.worker.TajoResourceAllocator; - -import java.io.IOException; -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -import static org.apache.tajo.TajoProtos.QueryState; - -public class QueryMasterTask extends CompositeService { - private static final Log LOG = LogFactory.getLog(QueryMasterTask.class.getName()); - - // query submission directory is private! - final public static FsPermission STAGING_DIR_PERMISSION = - FsPermission.createImmutable((short) 0700); // rwx-------- - - public static final String TMP_STAGING_DIR_PREFIX = ".staging"; - - private QueryId queryId; - - private Session session; - - private QueryContext queryContext; - - private QueryMasterTaskContext queryTaskContext; - - private QueryMaster.QueryMasterContext queryMasterContext; - - private Query query; - - private MasterPlan masterPlan; - - private String jsonExpr; - - private String logicalPlanJson; - - private AsyncDispatcher dispatcher; - - private final long querySubmitTime; - - private Map<String, TableDesc> tableDescMap = new HashMap<String, TableDesc>(); - - private TajoConf systemConf; - - private AtomicLong lastClientHeartbeat = new AtomicLong(-1); - - private AbstractResourceAllocator resourceAllocator; - - private AtomicBoolean stopped = new AtomicBoolean(false); - - private TajoMetrics queryMetrics; - - private Throwable initError; - - private final List<TajoWorkerProtocol.TaskFatalErrorReport> diagnostics = - new ArrayList<TajoWorkerProtocol.TaskFatalErrorReport>(); - - public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext, - QueryId queryId, Session session, QueryContext queryContext, String jsonExpr, - String logicalPlanJson) { - - super(QueryMasterTask.class.getName()); - this.queryMasterContext = queryMasterContext; - this.queryId = queryId; - this.session = session; - this.queryContext = queryContext; - this.jsonExpr = jsonExpr; - this.logicalPlanJson = logicalPlanJson; - this.querySubmitTime = System.currentTimeMillis(); - } - - @Override - public void init(Configuration conf) { - systemConf = (TajoConf)conf; - - try { - queryTaskContext = new QueryMasterTaskContext(); - String resourceManagerClassName = systemConf.getVar(TajoConf.ConfVars.RESOURCE_MANAGER_CLASS); - - if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) { - resourceAllocator = new TajoResourceAllocator(queryTaskContext); - } else { - throw new UnimplementedException(resourceManagerClassName + " is not supported yet"); - } - addService(resourceAllocator); - - dispatcher = new AsyncDispatcher(); - addService(dispatcher); - - dispatcher.register(StageEventType.class, new StageEventDispatcher()); - dispatcher.register(TaskEventType.class, new TaskEventDispatcher()); - dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher()); - dispatcher.register(QueryMasterQueryCompletedEvent.EventType.class, new QueryFinishEventHandler()); - dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher()); - dispatcher.register(LocalTaskEventType.class, new LocalTaskEventHandler()); - - initStagingDir(); - - queryMetrics = new TajoMetrics(queryId.toString()); - - super.init(systemConf); - } catch (Throwable t) { - LOG.error(t.getMessage(), t); - initError = t; - } - } - - public boolean isStopped() { - return stopped.get(); - } - - @Override - public void start() { - startQuery(); - super.start(); - } - - @Override - public void stop() { - - if(stopped.getAndSet(true)) { - return; - } - - LOG.info("Stopping QueryMasterTask:" + queryId); - - try { - resourceAllocator.stop(); - } catch (Throwable t) { - LOG.fatal(t.getMessage(), t); - } - - RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf()); - NettyClientBase tmClient = null; - try { - // In TajoMaster HA mode, if backup master be active status, - // worker may fail to connect existing active master. Thus, - // if worker can't connect the master, worker should try to connect another master and - // update master address in worker context. - if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - try { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } catch (Exception e) { - queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr( - HAServiceUtil.getResourceTrackerAddress(systemConf)); - queryMasterContext.getWorkerContext().setTajoMasterAddress( - HAServiceUtil.getMasterUmbilicalAddress(systemConf)); - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } - } else { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } finally { - connPool.releaseConnection(tmClient); - } - - super.stop(); - - //TODO change report to tajo master - if (queryMetrics != null) { - queryMetrics.report(new MetricsConsoleReporter()); - } - - LOG.info("Stopped QueryMasterTask:" + queryId); - } - - public void handleTaskRequestEvent(TaskRequestEvent event) { - ExecutionBlockId id = event.getExecutionBlockId(); - query.getStage(id).handleTaskRequestEvent(event); - } - - public void handleTaskFailed(TajoWorkerProtocol.TaskFatalErrorReport report) { - synchronized(diagnostics) { - if (diagnostics.size() < 10) { - diagnostics.add(report); - } - } - - getEventHandler().handle(new TaskFatalErrorEvent(report)); - } - - public Collection<TajoWorkerProtocol.TaskFatalErrorReport> getDiagnostics() { - synchronized(diagnostics) { - return Collections.unmodifiableCollection(diagnostics); - } - } - - private class StageEventDispatcher implements EventHandler<StageEvent> { - public void handle(StageEvent event) { - ExecutionBlockId id = event.getStageId(); - if(LOG.isDebugEnabled()) { - LOG.debug("StageEventDispatcher:" + id + "," + event.getType()); - } - query.getStage(id).handle(event); - } - } - - private class TaskEventDispatcher - implements EventHandler<TaskEvent> { - public void handle(TaskEvent event) { - TaskId taskId = event.getTaskId(); - if(LOG.isDebugEnabled()) { - LOG.debug("TaskEventDispatcher>" + taskId + "," + event.getType()); - } - Task task = query.getStage(taskId.getExecutionBlockId()). - getTask(taskId); - task.handle(event); - } - } - - private class TaskAttemptEventDispatcher - implements EventHandler<TaskAttemptEvent> { - public void handle(TaskAttemptEvent event) { - TaskAttemptId attemptId = event.getTaskAttemptId(); - Stage stage = query.getStage(attemptId.getTaskId().getExecutionBlockId()); - Task task = stage.getTask(attemptId.getTaskId()); - TaskAttempt attempt = task.getAttempt(attemptId); - attempt.handle(event); - } - } - - private class TaskSchedulerDispatcher - implements EventHandler<TaskSchedulerEvent> { - public void handle(TaskSchedulerEvent event) { - Stage stage = query.getStage(event.getExecutionBlockId()); - stage.getTaskScheduler().handle(event); - } - } - - private class LocalTaskEventHandler implements EventHandler<LocalTaskEvent> { - @Override - public void handle(LocalTaskEvent event) { - TajoContainerProxy proxy = (TajoContainerProxy) resourceAllocator.getContainers().get(event.getContainerId()); - if (proxy != null) { - proxy.killTaskAttempt(event.getTaskAttemptId()); - } - } - } - - private class QueryFinishEventHandler implements EventHandler<QueryMasterQueryCompletedEvent> { - @Override - public void handle(QueryMasterQueryCompletedEvent event) { - QueryId queryId = event.getQueryId(); - LOG.info("Query completion notified from " + queryId); - - while (!isTerminatedState(query.getSynchronizedState())) { - try { - synchronized (this) { - wait(10); - } - } catch (InterruptedException e) { - LOG.error(e); - } - } - LOG.info("Query final state: " + query.getSynchronizedState()); - - queryMasterContext.getEventHandler().handle(new QueryStopEvent(queryId)); - } - } - - private static boolean isTerminatedState(QueryState state) { - return - state == QueryState.QUERY_SUCCEEDED || - state == QueryState.QUERY_FAILED || - state == QueryState.QUERY_KILLED || - state == QueryState.QUERY_ERROR; - } - - public synchronized void startQuery() { - StorageManager sm = null; - LogicalPlan plan = null; - try { - if (query != null) { - LOG.warn("Query already started"); - return; - } - CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog(); - LogicalPlanner planner = new LogicalPlanner(catalog); - LogicalOptimizer optimizer = new LogicalOptimizer(systemConf); - Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class); - jsonExpr = null; // remove the possible OOM - plan = planner.createPlan(queryContext, expr); - - StoreType storeType = PlannerUtil.getStoreType(plan); - if (storeType != null) { - sm = StorageManager.getStorageManager(systemConf, storeType); - StorageProperty storageProperty = sm.getStorageProperty(); - if (storageProperty.isSortedInsert()) { - String tableName = PlannerUtil.getStoreTableName(plan); - LogicalRootNode rootNode = plan.getRootBlock().getRoot(); - TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild()); - if (tableDesc == null) { - throw new VerifyException("Can't get table meta data from catalog: " + tableName); - } - List<LogicalPlanRewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules( - getQueryTaskContext().getQueryContext(), tableDesc); - if (storageSpecifiedRewriteRules != null) { - for (LogicalPlanRewriteRule eachRule: storageSpecifiedRewriteRules) { - optimizer.addRuleAfterToJoinOpt(eachRule); - } - } - } - } - - optimizer.optimize(queryContext, plan); - - for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) { - LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN); - if (scanNodes != null) { - for (LogicalNode eachScanNode : scanNodes) { - ScanNode scanNode = (ScanNode) eachScanNode; - tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); - } - } - - scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.PARTITIONS_SCAN); - if (scanNodes != null) { - for (LogicalNode eachScanNode : scanNodes) { - ScanNode scanNode = (ScanNode) eachScanNode; - tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); - } - } - } - MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan); - queryMasterContext.getGlobalPlanner().build(masterPlan); - - query = new Query(queryTaskContext, queryId, querySubmitTime, - "", queryTaskContext.getEventHandler(), masterPlan); - - dispatcher.register(QueryEventType.class, query); - queryTaskContext.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.START)); - } catch (Throwable t) { - LOG.error(t.getMessage(), t); - initError = t; - - if (plan != null && sm != null) { - LogicalRootNode rootNode = plan.getRootBlock().getRoot(); - try { - sm.rollbackOutputCommit(rootNode.getChild()); - } catch (IOException e) { - LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e); - } - } - } - } - - private void initStagingDir() throws IOException { - Path stagingDir = null; - FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf); - - try { - - stagingDir = initStagingDir(systemConf, queryId.toString(), queryContext); - - // Create a subdirectories - LOG.info("The staging dir '" + stagingDir + "' is created."); - queryContext.setStagingDir(stagingDir); - } catch (IOException ioe) { - if (stagingDir != null && defaultFS.exists(stagingDir)) { - try { - defaultFS.delete(stagingDir, true); - LOG.info("The staging directory '" + stagingDir + "' is deleted"); - } catch (Exception e) { - LOG.warn(e.getMessage()); - } - } - - throw ioe; - } - } - - /** - * It initializes the final output and staging directory and sets - * them to variables. - */ - public static Path initStagingDir(TajoConf conf, String queryId, QueryContext context) throws IOException { - - String realUser; - String currentUser; - UserGroupInformation ugi; - ugi = UserGroupInformation.getLoginUser(); - realUser = ugi.getShortUserName(); - currentUser = UserGroupInformation.getCurrentUser().getShortUserName(); - - FileSystem fs; - Path stagingDir; - - //////////////////////////////////////////// - // Create Output Directory - //////////////////////////////////////////// - - String outputPath = context.get(QueryVars.OUTPUT_TABLE_PATH, ""); - if (context.isCreateTable() || context.isInsert()) { - if (outputPath == null || outputPath.isEmpty()) { - // hbase - stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId); - } else { - stagingDir = StorageUtil.concatPath(context.getOutputPath(), TMP_STAGING_DIR_PREFIX, queryId); - } - } else { - stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId); - } - - // initializ - fs = stagingDir.getFileSystem(conf); - - if (fs.exists(stagingDir)) { - throw new IOException("The staging directory '" + stagingDir + "' already exists"); - } - fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION)); - FileStatus fsStatus = fs.getFileStatus(stagingDir); - String owner = fsStatus.getOwner(); - - if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) { - throw new IOException("The ownership on the user's query " + - "directory " + stagingDir + " is not as expected. " + - "It is owned by " + owner + ". The directory must " + - "be owned by the submitter " + currentUser + " or " + - "by " + realUser); - } - - if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) { - LOG.info("Permissions on staging directory " + stagingDir + " are " + - "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " + - "to correct value " + STAGING_DIR_PERMISSION); - fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION)); - } - - Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); - fs.mkdirs(stagingResultDir); - - return stagingDir; - } - - public Query getQuery() { - return query; - } - - protected void expireQuerySession() { - if(!isTerminatedState(query.getState()) && !(query.getState() == QueryState.QUERY_KILL_WAIT)){ - query.handle(new QueryEvent(queryId, QueryEventType.KILL)); - } - } - - public QueryMasterTaskContext getQueryTaskContext() { - return queryTaskContext; - } - - public EventHandler getEventHandler() { - return queryTaskContext.getEventHandler(); - } - - public void touchSessionTime() { - this.lastClientHeartbeat.set(System.currentTimeMillis()); - } - - public long getLastClientHeartbeat() { - return this.lastClientHeartbeat.get(); - } - - public QueryId getQueryId() { - return queryId; - } - - public boolean isInitError() { - return initError != null; - } - - public QueryState getState() { - if(query == null) { - if (isInitError()) { - return QueryState.QUERY_ERROR; - } else { - return QueryState.QUERY_NOT_ASSIGNED; - } - } else { - return query.getState(); - } - } - - public Throwable getInitError() { - return initError; - } - - public String getErrorMessage() { - if (isInitError()) { - return StringUtils.stringifyException(initError); - } else { - return null; - } - } - - public long getQuerySubmitTime() { - return this.querySubmitTime; - } - - public class QueryMasterTaskContext { - EventHandler eventHandler; - public QueryMaster.QueryMasterContext getQueryMasterContext() { - return queryMasterContext; - } - - public Session getSession() { - return session; - } - - public QueryContext getQueryContext() { - return queryContext; - } - - public TajoConf getConf() { - return systemConf; - } - - public Clock getClock() { - return queryMasterContext.getClock(); - } - - public Query getQuery() { - return query; - } - - public QueryId getQueryId() { - return queryId; - } - - public Path getStagingDir() { - return queryContext.getStagingDir(); - } - - public synchronized EventHandler getEventHandler() { - if(eventHandler == null) { - eventHandler = dispatcher.getEventHandler(); - } - return eventHandler; - } - - public AsyncDispatcher getDispatcher() { - return dispatcher; - } - - public Stage getStage(ExecutionBlockId id) { - return query.getStage(id); - } - - public Map<String, TableDesc> getTableDescMap() { - return tableDescMap; - } - - public float getProgress() { - if(query == null) { - return 0.0f; - } - return query.getProgress(); - } - - public AbstractResourceAllocator getResourceAllocator() { - return resourceAllocator; - } - - public TajoMetrics getQueryMetrics() { - return queryMetrics; - } - } -} \ No newline at end of file
