http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --cc tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java index 0000000,bab5903..742665a mode 000000,100644..100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@@ -1,0 -1,638 +1,650 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.tajo.querymaster; + + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.FileStatus; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.fs.permission.FsPermission; + import org.apache.hadoop.security.UserGroupInformation; + import org.apache.hadoop.service.CompositeService; + import org.apache.hadoop.util.StringUtils; + import org.apache.hadoop.yarn.event.AsyncDispatcher; + import org.apache.hadoop.yarn.event.EventHandler; + import org.apache.hadoop.yarn.util.Clock; + import org.apache.tajo.*; + import org.apache.tajo.algebra.Expr; + import org.apache.tajo.algebra.JsonHelper; + import org.apache.tajo.catalog.CatalogService; + import org.apache.tajo.catalog.TableDesc; + import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; + import org.apache.tajo.conf.TajoConf; ++import org.apache.tajo.plan.LogicalOptimizer; ++import org.apache.tajo.plan.LogicalPlan; ++import org.apache.tajo.plan.LogicalPlanner; ++import org.apache.tajo.plan.logical.LogicalRootNode; ++import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; ++import org.apache.tajo.plan.util.PlannerUtil; + import org.apache.tajo.engine.planner.global.MasterPlan; + import org.apache.tajo.engine.query.QueryContext; + import org.apache.tajo.exception.UnimplementedException; + import org.apache.tajo.ha.HAServiceUtil; + import org.apache.tajo.ipc.TajoMasterProtocol; + import org.apache.tajo.ipc.TajoWorkerProtocol; + import org.apache.tajo.master.TajoContainerProxy; + import org.apache.tajo.master.event.*; + import org.apache.tajo.master.rm.TajoWorkerResourceManager; + import org.apache.tajo.session.Session; + import org.apache.tajo.plan.LogicalOptimizer; + import org.apache.tajo.plan.LogicalPlan; + import org.apache.tajo.plan.LogicalPlanner; + import org.apache.tajo.plan.logical.LogicalNode; -import org.apache.tajo.plan.logical.LogicalRootNode; + import org.apache.tajo.plan.logical.NodeType; + import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; -import org.apache.tajo.plan.util.PlannerUtil; + import org.apache.tajo.plan.verifier.VerifyException; + import org.apache.tajo.rpc.NettyClientBase; + import org.apache.tajo.rpc.RpcConnectionPool; + import org.apache.tajo.storage.StorageManager; + import org.apache.tajo.storage.StorageProperty; + import org.apache.tajo.storage.StorageUtil; + import org.apache.tajo.util.metrics.TajoMetrics; + import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter; + import org.apache.tajo.worker.AbstractResourceAllocator; + import org.apache.tajo.worker.TajoResourceAllocator; + + import java.io.IOException; + import java.util.*; + import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicLong; + + import static org.apache.tajo.TajoProtos.QueryState; + + public class QueryMasterTask extends CompositeService { + private static final Log LOG = LogFactory.getLog(QueryMasterTask.class.getName()); + + // query submission directory is private! + final public static FsPermission STAGING_DIR_PERMISSION = + FsPermission.createImmutable((short) 0700); // rwx-------- + + public static final String TMP_STAGING_DIR_PREFIX = ".staging"; + + private QueryId queryId; + + private Session session; + + private QueryContext queryContext; + + private QueryMasterTaskContext queryTaskContext; + + private QueryMaster.QueryMasterContext queryMasterContext; + + private Query query; + + private MasterPlan masterPlan; + + private String jsonExpr; + + private String logicalPlanJson; + + private AsyncDispatcher dispatcher; + + private final long querySubmitTime; + + private Map<String, TableDesc> tableDescMap = new HashMap<String, TableDesc>(); + + private TajoConf systemConf; + + private AtomicLong lastClientHeartbeat = new AtomicLong(-1); + + private AbstractResourceAllocator resourceAllocator; + + private AtomicBoolean stopped = new AtomicBoolean(false); + + private TajoMetrics queryMetrics; + + private Throwable initError; + + private final List<TajoWorkerProtocol.TaskFatalErrorReport> diagnostics = + new ArrayList<TajoWorkerProtocol.TaskFatalErrorReport>(); + + public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext, + QueryId queryId, Session session, QueryContext queryContext, String jsonExpr, + String logicalPlanJson) { + + super(QueryMasterTask.class.getName()); + this.queryMasterContext = queryMasterContext; + this.queryId = queryId; + this.session = session; + this.queryContext = queryContext; + this.jsonExpr = jsonExpr; + this.logicalPlanJson = logicalPlanJson; + this.querySubmitTime = System.currentTimeMillis(); + } + + @Override + public void init(Configuration conf) { + systemConf = (TajoConf)conf; + + try { + queryTaskContext = new QueryMasterTaskContext(); + String resourceManagerClassName = systemConf.getVar(TajoConf.ConfVars.RESOURCE_MANAGER_CLASS); + + if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) { + resourceAllocator = new TajoResourceAllocator(queryTaskContext); + } else { + throw new UnimplementedException(resourceManagerClassName + " is not supported yet"); + } + addService(resourceAllocator); + + dispatcher = new AsyncDispatcher(); + addService(dispatcher); + + dispatcher.register(StageEventType.class, new StageEventDispatcher()); + dispatcher.register(TaskEventType.class, new TaskEventDispatcher()); + dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher()); + dispatcher.register(QueryMasterQueryCompletedEvent.EventType.class, new QueryFinishEventHandler()); + dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher()); + dispatcher.register(LocalTaskEventType.class, new LocalTaskEventHandler()); + + initStagingDir(); + + queryMetrics = new TajoMetrics(queryId.toString()); + + super.init(systemConf); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + initError = t; + } + } + + public boolean isStopped() { + return stopped.get(); + } + + @Override + public void start() { + startQuery(); + super.start(); + } + + @Override + public void stop() { + + if(stopped.getAndSet(true)) { + return; + } + + LOG.info("Stopping QueryMasterTask:" + queryId); + + try { + resourceAllocator.stop(); + } catch (Throwable t) { + LOG.fatal(t.getMessage(), t); + } + + RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf()); + NettyClientBase tmClient = null; + try { + // In TajoMaster HA mode, if backup master be active status, + // worker may fail to connect existing active master. Thus, + // if worker can't connect the master, worker should try to connect another master and + // update master address in worker context. + if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + try { + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } catch (Exception e) { + queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr( + HAServiceUtil.getResourceTrackerAddress(systemConf)); + queryMasterContext.getWorkerContext().setTajoMasterAddress( + HAServiceUtil.getMasterUmbilicalAddress(systemConf)); + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + } else { + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } finally { + connPool.releaseConnection(tmClient); + } + + super.stop(); + + //TODO change report to tajo master + if (queryMetrics != null) { + queryMetrics.report(new MetricsConsoleReporter()); + } + + LOG.info("Stopped QueryMasterTask:" + queryId); + } + + public void handleTaskRequestEvent(TaskRequestEvent event) { + ExecutionBlockId id = event.getExecutionBlockId(); + query.getStage(id).handleTaskRequestEvent(event); + } + + public void handleTaskFailed(TajoWorkerProtocol.TaskFatalErrorReport report) { + synchronized(diagnostics) { + if (diagnostics.size() < 10) { + diagnostics.add(report); + } + } + + getEventHandler().handle(new TaskFatalErrorEvent(report)); + } + + public Collection<TajoWorkerProtocol.TaskFatalErrorReport> getDiagnostics() { + synchronized(diagnostics) { + return Collections.unmodifiableCollection(diagnostics); + } + } + + private class StageEventDispatcher implements EventHandler<StageEvent> { + public void handle(StageEvent event) { + ExecutionBlockId id = event.getStageId(); + if(LOG.isDebugEnabled()) { + LOG.debug("StageEventDispatcher:" + id + "," + event.getType()); + } + query.getStage(id).handle(event); + } + } + + private class TaskEventDispatcher + implements EventHandler<TaskEvent> { + public void handle(TaskEvent event) { + TaskId taskId = event.getTaskId(); + if(LOG.isDebugEnabled()) { + LOG.debug("TaskEventDispatcher>" + taskId + "," + event.getType()); + } + Task task = query.getStage(taskId.getExecutionBlockId()). + getTask(taskId); + task.handle(event); + } + } + + private class TaskAttemptEventDispatcher + implements EventHandler<TaskAttemptEvent> { + public void handle(TaskAttemptEvent event) { + TaskAttemptId attemptId = event.getTaskAttemptId(); + Stage stage = query.getStage(attemptId.getTaskId().getExecutionBlockId()); + Task task = stage.getTask(attemptId.getTaskId()); + TaskAttempt attempt = task.getAttempt(attemptId); + attempt.handle(event); + } + } + + private class TaskSchedulerDispatcher + implements EventHandler<TaskSchedulerEvent> { + public void handle(TaskSchedulerEvent event) { + Stage stage = query.getStage(event.getExecutionBlockId()); + stage.getTaskScheduler().handle(event); + } + } + + private class LocalTaskEventHandler implements EventHandler<LocalTaskEvent> { + @Override + public void handle(LocalTaskEvent event) { + TajoContainerProxy proxy = (TajoContainerProxy) resourceAllocator.getContainers().get(event.getContainerId()); + if (proxy != null) { + proxy.killTaskAttempt(event.getTaskAttemptId()); + } + } + } + + private class QueryFinishEventHandler implements EventHandler<QueryMasterQueryCompletedEvent> { + @Override + public void handle(QueryMasterQueryCompletedEvent event) { + QueryId queryId = event.getQueryId(); + LOG.info("Query completion notified from " + queryId); + + while (!isTerminatedState(query.getSynchronizedState())) { + try { + synchronized (this) { + wait(10); + } + } catch (InterruptedException e) { + LOG.error(e); + } + } + LOG.info("Query final state: " + query.getSynchronizedState()); + + queryMasterContext.getEventHandler().handle(new QueryStopEvent(queryId)); + } + } + + private static boolean isTerminatedState(QueryState state) { + return + state == QueryState.QUERY_SUCCEEDED || + state == QueryState.QUERY_FAILED || + state == QueryState.QUERY_KILLED || + state == QueryState.QUERY_ERROR; + } + + public synchronized void startQuery() { + StorageManager sm = null; + LogicalPlan plan = null; + try { + if (query != null) { + LOG.warn("Query already started"); + return; + } ++ LOG.info(SessionVars.INDEX_ENABLED.keyname() + " : " + queryContext.getBool(SessionVars.INDEX_ENABLED)); + CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog(); + LogicalPlanner planner = new LogicalPlanner(catalog); - LogicalOptimizer optimizer = new LogicalOptimizer(systemConf); ++ LogicalOptimizer optimizer = new LogicalOptimizer(systemConf, catalog); + Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class); + jsonExpr = null; // remove the possible OOM + plan = planner.createPlan(queryContext, expr); + + StoreType storeType = PlannerUtil.getStoreType(plan); + if (storeType != null) { + sm = StorageManager.getStorageManager(systemConf, storeType); + StorageProperty storageProperty = sm.getStorageProperty(); + if (storageProperty.isSortedInsert()) { + String tableName = PlannerUtil.getStoreTableName(plan); + LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild()); + if (tableDesc == null) { + throw new VerifyException("Can't get table meta data from catalog: " + tableName); + } + List<LogicalPlanRewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules( + getQueryTaskContext().getQueryContext(), tableDesc); + if (storageSpecifiedRewriteRules != null) { + for (LogicalPlanRewriteRule eachRule: storageSpecifiedRewriteRules) { + optimizer.addRuleAfterToJoinOpt(eachRule); + } + } + } + } + + optimizer.optimize(queryContext, plan); + + for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) { + LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN); + if (scanNodes != null) { + for (LogicalNode eachScanNode : scanNodes) { + ScanNode scanNode = (ScanNode) eachScanNode; + tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); + } + } + + scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.PARTITIONS_SCAN); + if (scanNodes != null) { + for (LogicalNode eachScanNode : scanNodes) { + ScanNode scanNode = (ScanNode) eachScanNode; + tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); + } + } ++ ++ scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.INDEX_SCAN); ++ if (scanNodes != null) { ++ for (LogicalNode eachScanNode : scanNodes) { ++ ScanNode scanNode = (ScanNode) eachScanNode; ++ tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); ++ } ++ } + } + MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan); + queryMasterContext.getGlobalPlanner().build(masterPlan); + + query = new Query(queryTaskContext, queryId, querySubmitTime, + "", queryTaskContext.getEventHandler(), masterPlan); + + dispatcher.register(QueryEventType.class, query); + queryTaskContext.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.START)); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + initError = t; + + if (plan != null && sm != null) { + LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + try { + sm.rollbackOutputCommit(rootNode.getChild()); + } catch (IOException e) { + LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e); + } + } + } + } + + private void initStagingDir() throws IOException { + Path stagingDir = null; + FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf); + + try { + + stagingDir = initStagingDir(systemConf, queryId.toString(), queryContext); + + // Create a subdirectories + LOG.info("The staging dir '" + stagingDir + "' is created."); + queryContext.setStagingDir(stagingDir); + } catch (IOException ioe) { + if (stagingDir != null && defaultFS.exists(stagingDir)) { + try { + defaultFS.delete(stagingDir, true); + LOG.info("The staging directory '" + stagingDir + "' is deleted"); + } catch (Exception e) { + LOG.warn(e.getMessage()); + } + } + + throw ioe; + } + } + + /** + * It initializes the final output and staging directory and sets + * them to variables. + */ + public static Path initStagingDir(TajoConf conf, String queryId, QueryContext context) throws IOException { + + String realUser; + String currentUser; + UserGroupInformation ugi; + ugi = UserGroupInformation.getLoginUser(); + realUser = ugi.getShortUserName(); + currentUser = UserGroupInformation.getCurrentUser().getShortUserName(); + + FileSystem fs; + Path stagingDir; + + //////////////////////////////////////////// + // Create Output Directory + //////////////////////////////////////////// + + String outputPath = context.get(QueryVars.OUTPUT_TABLE_PATH, ""); + if (context.isCreateTable() || context.isInsert()) { + if (outputPath == null || outputPath.isEmpty()) { + // hbase + stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId); + } else { + stagingDir = StorageUtil.concatPath(context.getOutputPath(), TMP_STAGING_DIR_PREFIX, queryId); + } + } else { + stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId); + } + + // initializ + fs = stagingDir.getFileSystem(conf); + + if (fs.exists(stagingDir)) { + throw new IOException("The staging directory '" + stagingDir + "' already exists"); + } + fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION)); + FileStatus fsStatus = fs.getFileStatus(stagingDir); + String owner = fsStatus.getOwner(); + + if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) { + throw new IOException("The ownership on the user's query " + + "directory " + stagingDir + " is not as expected. " + + "It is owned by " + owner + ". The directory must " + + "be owned by the submitter " + currentUser + " or " + + "by " + realUser); + } + + if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) { + LOG.info("Permissions on staging directory " + stagingDir + " are " + + "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " + + "to correct value " + STAGING_DIR_PERMISSION); + fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION)); + } + + Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + fs.mkdirs(stagingResultDir); + + return stagingDir; + } + + public Query getQuery() { + return query; + } + + protected void expireQuerySession() { + if(!isTerminatedState(query.getState()) && !(query.getState() == QueryState.QUERY_KILL_WAIT)){ + query.handle(new QueryEvent(queryId, QueryEventType.KILL)); + } + } + + public QueryMasterTaskContext getQueryTaskContext() { + return queryTaskContext; + } + + public EventHandler getEventHandler() { + return queryTaskContext.getEventHandler(); + } + + public void touchSessionTime() { + this.lastClientHeartbeat.set(System.currentTimeMillis()); + } + + public long getLastClientHeartbeat() { + return this.lastClientHeartbeat.get(); + } + + public QueryId getQueryId() { + return queryId; + } + + public boolean isInitError() { + return initError != null; + } + + public QueryState getState() { + if(query == null) { + if (isInitError()) { + return QueryState.QUERY_ERROR; + } else { + return QueryState.QUERY_NOT_ASSIGNED; + } + } else { + return query.getState(); + } + } + + public Throwable getInitError() { + return initError; + } + + public String getErrorMessage() { + if (isInitError()) { + return StringUtils.stringifyException(initError); + } else { + return null; + } + } + + public long getQuerySubmitTime() { + return this.querySubmitTime; + } + + public class QueryMasterTaskContext { + EventHandler eventHandler; + public QueryMaster.QueryMasterContext getQueryMasterContext() { + return queryMasterContext; + } + + public Session getSession() { + return session; + } + + public QueryContext getQueryContext() { + return queryContext; + } + + public TajoConf getConf() { + return systemConf; + } + + public Clock getClock() { + return queryMasterContext.getClock(); + } + + public Query getQuery() { + return query; + } + + public QueryId getQueryId() { + return queryId; + } + + public Path getStagingDir() { + return queryContext.getStagingDir(); + } + + public synchronized EventHandler getEventHandler() { + if(eventHandler == null) { + eventHandler = dispatcher.getEventHandler(); + } + return eventHandler; + } + + public AsyncDispatcher getDispatcher() { + return dispatcher; + } + + public Stage getStage(ExecutionBlockId id) { + return query.getStage(id); + } + + public Map<String, TableDesc> getTableDescMap() { + return tableDescMap; + } + + public float getProgress() { + if(query == null) { + return 0.0f; + } + return query.getProgress(); + } + + public AbstractResourceAllocator getResourceAllocator() { + return resourceAllocator; + } + + public TajoMetrics getQueryMetrics() { + return queryMetrics; + } + } + }
http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java ---------------------------------------------------------------------- diff --cc tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java index 6a13898,2ae4bed..c10d3b7 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java @@@ -31,10 -29,9 +31,10 @@@ import org.apache.tajo.annotation.Nulla import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ClientProtos.GetQueryHistoryResponse; import org.apache.tajo.ipc.ClientProtos.QueryIdRequest; +import org.apache.tajo.ipc.ClientProtos.RequestResult; import org.apache.tajo.ipc.ClientProtos.ResultCode; import org.apache.tajo.ipc.QueryMasterClientProtocol; - import org.apache.tajo.master.querymaster.QueryMasterTask; + import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.rpc.BlockingRpcServer; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.util.NetUtils; http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java ---------------------------------------------------------------------- diff --cc tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java index 0000000,a125196..4526863 mode 000000,100644..100644 --- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java @@@ -1,0 -1,125 +1,125 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.tajo.querymaster; + + import org.apache.tajo.*; + import org.apache.tajo.algebra.Expr; + import org.apache.tajo.benchmark.TPCH; + import org.apache.tajo.catalog.CatalogService; + import org.apache.tajo.client.TajoClient; + import org.apache.tajo.client.TajoClientImpl; + import org.apache.tajo.conf.TajoConf; + import org.apache.tajo.engine.parser.SQLAnalyzer; + import org.apache.tajo.engine.planner.global.GlobalPlanner; + import org.apache.tajo.engine.planner.global.MasterPlan; + import org.apache.tajo.engine.query.QueryContext; + import org.apache.tajo.master.event.QueryEvent; + import org.apache.tajo.master.event.QueryEventType; + import org.apache.tajo.session.Session; + import org.apache.tajo.plan.LogicalOptimizer; + import org.apache.tajo.plan.LogicalPlan; + import org.apache.tajo.plan.LogicalPlanner; + import org.junit.AfterClass; + import org.junit.BeforeClass; + import org.junit.Test; + + import java.io.File; + import java.io.IOException; + + import static org.junit.Assert.*; + + public class TestKillQuery { + private static TajoTestingCluster cluster; + private static TajoConf conf; + private static TajoClient client; + + @BeforeClass + public static void setUp() throws Exception { + cluster = new TajoTestingCluster(); + cluster.startMiniClusterInLocal(1); + conf = cluster.getConfiguration(); + client = new TajoClientImpl(cluster.getConfiguration()); + File file = TPCH.getDataFile("lineitem"); + client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) " + + "using text location 'file://" + file.getAbsolutePath() + "'"); + assertTrue(client.existTable("default.lineitem")); + } + + @AfterClass + public static void tearDown() throws IOException { + if (client != null) client.close(); + if (cluster != null) cluster.shutdownMiniCluster(); + } + + @Test + public final void testKillQueryFromInitState() throws Exception { + SQLAnalyzer analyzer = new SQLAnalyzer(); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf); + Session session = LocalTajoTestingUtility.createDummySession(); + CatalogService catalog = cluster.getMaster().getCatalog(); + String query = "select l_orderkey, l_partkey from lineitem group by l_orderkey, l_partkey order by l_orderkey"; + + LogicalPlanner planner = new LogicalPlanner(catalog); - LogicalOptimizer optimizer = new LogicalOptimizer(conf); ++ LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog); + Expr expr = analyzer.parse(query); + LogicalPlan plan = planner.createPlan(defaultContext, expr); + + optimizer.optimize(plan); + + QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0); + QueryContext queryContext = new QueryContext(conf); + MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan); + GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog); + globalPlanner.build(masterPlan); + + QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster(); + QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(), + queryId, session, defaultContext, expr.toJson(), plan.getRootBlock().getRoot().toJson()); + + queryMasterTask.init(conf); + queryMasterTask.getQueryTaskContext().getDispatcher().start(); + queryMasterTask.startQuery(); + + try{ + cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_RUNNING, 2); + } finally { + assertEquals(TajoProtos.QueryState.QUERY_RUNNING, queryMasterTask.getQuery().getSynchronizedState()); + } + + Stage stage = queryMasterTask.getQuery().getStages().iterator().next(); + assertNotNull(stage); + + try{ + cluster.waitForStageState(stage, StageState.INITED, 2); + } finally { + assertEquals(StageState.INITED, stage.getSynchronizedState()); + } + + // fire kill event + Query q = queryMasterTask.getQuery(); + q.handle(new QueryEvent(queryId, QueryEventType.KILL)); + + try{ + cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50); + } finally { + assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState()); + } + queryMasterTask.stop(); + } + }
