TAJO-1279: Cleanup TajoAsyncDispatcher and interrupt stop events. (jinho) Closes #331
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/6582d865 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/6582d865 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/6582d865 Branch: refs/heads/index_support Commit: 6582d8656a3e8147984a86a9e62688b52ec1f681 Parents: 7615b75 Author: jhkim <[email protected]> Authored: Thu Jan 8 16:18:31 2015 +0900 Committer: jhkim <[email protected]> Committed: Thu Jan 8 16:18:31 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../java/org/apache/tajo/benchmark/TPCH.java | 22 ++ .../tajo/master/DefaultTaskScheduler.java | 6 +- .../apache/tajo/master/TajoAsyncDispatcher.java | 232 ------------------- .../tajo/master/event/QueryStopEvent.java | 47 ++++ .../master/querymaster/QueryInProgress.java | 14 +- .../tajo/master/querymaster/QueryJobEvent.java | 1 + .../master/querymaster/QueryJobManager.java | 16 +- .../tajo/master/querymaster/QueryMaster.java | 17 +- .../master/querymaster/QueryMasterTask.java | 44 ++-- .../apache/tajo/master/querymaster/Stage.java | 8 +- .../java/org/apache/tajo/QueryTestCaseBase.java | 6 + .../org/apache/tajo/TajoTestingCluster.java | 8 +- .../test/java/org/apache/tajo/TpchTestBase.java | 8 +- .../org/apache/tajo/cli/tsql/TestTajoCli.java | 4 +- .../java/org/apache/tajo/jdbc/TestTajoJdbc.java | 29 ++- .../tajo/master/querymaster/TestKillQuery.java | 37 ++- .../tajo/scheduler/TestFifoScheduler.java | 43 ++-- .../org/apache/tajo/worker/TestHistory.java | 44 ++-- 19 files changed, 236 insertions(+), 353 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 9ca4579..fe777a7 100644 --- a/CHANGES +++ b/CHANGES @@ -27,6 +27,9 @@ Release 0.9.1 - unreleased IMPROVEMENT + TAJO-1279: Cleanup TajoAsyncDispatcher and interrupt stop events. + (jinho) + TAJO-1285: Refactoring Magic Number to HAConstants. (DaeMyung Kang via jaehwa) http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java index f4b4d6a..e2ea25c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java +++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java @@ -18,6 +18,7 @@ package org.apache.tajo.benchmark; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; @@ -33,8 +34,10 @@ import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.storage.StorageConstants; +import java.io.File; import java.io.IOException; import java.sql.SQLException; +import java.util.List; import java.util.Map; public class TPCH extends BenchmarkSet { @@ -225,4 +228,23 @@ public class TPCH extends BenchmarkSet { throw new ServiceException(s); } } + + public static List<String> getDataFilePaths(String... tables) { + List<String> tablePaths = Lists.newArrayList(); + File file; + for (String table : tables) { + file = getDataFile(table); + tablePaths.add(file.getAbsolutePath()); + } + return tablePaths; + } + + public static File getDataFile(String table) { + File file = new File("src/test/tpch/" + table + ".tbl"); + if (!file.exists()) { + file = new File(System.getProperty("user.dir") + "/tajo-core/src/test/tpch/" + table + + ".tbl"); + } + return file; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java index 1cd6587..d47c93a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java @@ -145,8 +145,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } // Return all of request callbacks instantly. - for (TaskRequestEvent req : taskRequests.taskRequestQueue) { - req.getCallback().run(stopTaskRunnerReq); + if(taskRequests != null){ + for (TaskRequestEvent req : taskRequests.taskRequestQueue) { + req.getCallback().run(stopTaskRunnerReq); + } } LOG.info("Task Scheduler stopped"); http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java deleted file mode 100644 index 751b21b..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java +++ /dev/null @@ -1,232 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.util.ShutdownHookManager; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.Event; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -public class TajoAsyncDispatcher extends AbstractService implements Dispatcher { - - private static final Log LOG = LogFactory.getLog(TajoAsyncDispatcher.class); - - private final BlockingQueue<Event> eventQueue; - private volatile boolean stopped = false; - - private Thread eventHandlingThread; - protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers; - private boolean exitOnDispatchException; - - private String id; - - public TajoAsyncDispatcher(String id) { - this(id, new LinkedBlockingQueue<Event>()); - } - - public TajoAsyncDispatcher(String id, BlockingQueue<Event> eventQueue) { - super(TajoAsyncDispatcher.class.getName()); - this.id = id; - this.eventQueue = eventQueue; - this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>(); - } - - Runnable createThread() { - return new Runnable() { - @Override - public void run() { - while (!stopped && !Thread.currentThread().isInterrupted()) { - Event event; - try { - event = eventQueue.take(); - if(LOG.isDebugEnabled()) { - LOG.debug(id + ",event take:" + event.getType() + "," + event); - } - } catch(InterruptedException ie) { - if (!stopped) { - LOG.warn("AsyncDispatcher thread interrupted"); - } - return; - } - dispatch(event); - } - } - }; - } - - @Override - public synchronized void init(Configuration conf) { - this.exitOnDispatchException = - conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, - Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); - super.init(conf); - } - - @Override - public void start() { - //start all the components - super.start(); - eventHandlingThread = new Thread(createThread()); - eventHandlingThread.setName("AsyncDispatcher event handler"); - eventHandlingThread.start(); - - LOG.info("AsyncDispatcher started:" + id); - } - - @Override - public synchronized void stop() { - if(stopped) { - return; - } - stopped = true; - if (eventHandlingThread != null) { - eventHandlingThread.interrupt(); - try { - eventHandlingThread.join(); - } catch (InterruptedException ie) { - LOG.warn("Interrupted Exception while stopping"); - } - } - - // stop all the components - super.stop(); - - LOG.info("AsyncDispatcher stopped:" + id); - } - - @SuppressWarnings("unchecked") - protected void dispatch(Event event) { - //all events go thru this loop - if (LOG.isDebugEnabled()) { - LOG.debug("Dispatching the event " + event.getClass().getName() + "." - + event.toString()); - } - Class<? extends Enum> type = event.getType().getDeclaringClass(); - - try{ - EventHandler handler = eventDispatchers.get(type); - if(handler != null) { - handler.handle(event); - } else { - throw new Exception("No handler for registered for " + type); - } - } catch (Throwable t) { - //TODO Maybe log the state of the queue - LOG.fatal("Error in dispatcher thread:" + event.getType(), t); - if (exitOnDispatchException && (ShutdownHookManager.get().isShutdownInProgress()) == false) { - LOG.info("Exiting, bye.."); - System.exit(-1); - } - } finally { - } - } - - @SuppressWarnings("unchecked") - @Override - public void register(Class<? extends Enum> eventType, - EventHandler handler) { - /* check to see if we have a listener registered */ - EventHandler<Event> registeredHandler = (EventHandler<Event>) - eventDispatchers.get(eventType); - LOG.debug("Registering " + eventType + " for " + handler.getClass()); - if (registeredHandler == null) { - eventDispatchers.put(eventType, handler); - } else if (!(registeredHandler instanceof MultiListenerHandler)){ - /* for multiple listeners of an event add the multiple listener handler */ - MultiListenerHandler multiHandler = new MultiListenerHandler(); - multiHandler.addHandler(registeredHandler); - multiHandler.addHandler(handler); - eventDispatchers.put(eventType, multiHandler); - } else { - /* already a multilistener, just add to it */ - MultiListenerHandler multiHandler - = (MultiListenerHandler) registeredHandler; - multiHandler.addHandler(handler); - } - } - - @Override - public EventHandler getEventHandler() { - return new GenericEventHandler(); - } - - class GenericEventHandler implements EventHandler<Event> { - public void handle(Event event) { - /* all this method does is enqueue all the events onto the queue */ - int qSize = eventQueue.size(); - if (qSize !=0 && qSize %1000 == 0) { - LOG.info("Size of event-queue is " + qSize); - } - int remCapacity = eventQueue.remainingCapacity(); - if (remCapacity < 1000) { - LOG.warn("Very low remaining capacity in the event-queue: " - + remCapacity); - } - try { - if(LOG.isDebugEnabled()) { - LOG.debug(id + ",add event:" + - event.getType() + "," + event + "," + - (eventHandlingThread == null ? "null" : eventHandlingThread.isAlive())); - } - eventQueue.put(event); - } catch (InterruptedException e) { - if (!stopped) { - LOG.warn("AsyncDispatcher thread interrupted", e); - } - throw new YarnRuntimeException(e); - } - } - } - - /** - * Multiplexing an event. Sending it to different handlers that - * are interested in the event. - */ - static class MultiListenerHandler implements EventHandler<Event> { - List<EventHandler<Event>> listofHandlers; - - public MultiListenerHandler() { - listofHandlers = new ArrayList<EventHandler<Event>>(); - } - - @Override - public void handle(Event event) { - for (EventHandler<Event> handler: listofHandlers) { - handler.handle(event); - } - } - - void addHandler(EventHandler<Event> handler) { - listofHandlers.add(handler); - } - - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStopEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStopEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStopEvent.java new file mode 100644 index 0000000..6d57d4a --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStopEvent.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.event; + +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.tajo.QueryId; + +/** + * This event is conveyed to QueryMaster. + */ +public class QueryStopEvent extends AbstractEvent { + public enum EventType { + QUERY_STOP + } + + private final QueryId queryId; + + public QueryStopEvent(QueryId queryId) { + super(EventType.QUERY_STOP); + this.queryId = queryId; + } + + public QueryId getQueryId() { + return queryId; + } + + @Override + public String toString() { + return getClass().getName() + "," + getType() + "," + queryId; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java index ca0bd72..0a87990 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java @@ -23,16 +23,15 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.QueryId; import org.apache.tajo.TajoProtos; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto; -import org.apache.tajo.master.TajoAsyncDispatcher; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.master.rm.WorkerResourceManager; import org.apache.tajo.master.session.Session; @@ -55,7 +54,7 @@ public class QueryInProgress extends CompositeService { private Session session; - private TajoAsyncDispatcher dispatcher; + private AsyncDispatcher dispatcher; private LogicalRootNode plan; @@ -88,7 +87,7 @@ public class QueryInProgress extends CompositeService { @Override public void init(Configuration conf) { - dispatcher = new TajoAsyncDispatcher("QueryInProgress:" + queryId); + dispatcher = new AsyncDispatcher(); this.addService(dispatcher); dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler()); @@ -193,8 +192,6 @@ public class QueryInProgress extends CompositeService { new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInProgress.getQueryInfo())); } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_START) { submmitQueryToMaster(); - } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_FINISH) { - stop(); } else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) { kill(); } @@ -255,7 +252,7 @@ public class QueryInProgress extends CompositeService { } public boolean isStarted() { - return this.querySubmitted.get(); + return !stopped.get() && this.querySubmitted.get(); } private void heartbeat(QueryInfo queryInfo) { @@ -289,7 +286,8 @@ public class QueryInProgress extends CompositeService { if (isFinishState(this.queryInfo.getQueryState())) { - stop(); + masterContext.getQueryJobManager().getEventHandler().handle( + new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo)); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java index 811de1b..ce30ec7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java @@ -37,6 +37,7 @@ public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type> { QUERY_JOB_START, QUERY_JOB_HEARTBEAT, QUERY_JOB_FINISH, + QUERY_JOB_STOP, QUERY_MASTER_START, QUERY_MASTER_STOP, QUERY_JOB_KILL http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java index 34a0d01..13f6456 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java @@ -40,7 +40,6 @@ import org.apache.tajo.scheduler.SimpleFifoScheduler; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -187,15 +186,16 @@ public class QueryJobManager extends CompositeService { LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]"); return; } - if(queryInProgress.isStarted()){ + + if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) { + stopQuery(event.getQueryInfo().getQueryId()); + } else if (queryInProgress.isStarted()) { queryInProgress.getEventHandler().handle(event); - } else { - if(event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL){ - scheduler.removeQuery(queryInProgress.getQueryId()); - queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED); + } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) { + scheduler.removeQuery(queryInProgress.getQueryId()); + queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED); - stopQuery(queryInProgress.getQueryId()); - } + stopQuery(queryInProgress.getQueryId()); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java index 7623026..641de78 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; @@ -35,8 +36,8 @@ import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.TajoAsyncDispatcher; import org.apache.tajo.master.event.QueryStartEvent; +import org.apache.tajo.master.event.QueryStopEvent; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; @@ -66,7 +67,7 @@ public class QueryMaster extends CompositeService implements EventHandler { private Clock clock; - private TajoAsyncDispatcher dispatcher; + private AsyncDispatcher dispatcher; private GlobalPlanner globalPlanner; @@ -110,12 +111,13 @@ public class QueryMaster extends CompositeService implements EventHandler { clock = new SystemClock(); - this.dispatcher = new TajoAsyncDispatcher("querymaster_" + System.currentTimeMillis()); + this.dispatcher = new AsyncDispatcher(); addIfService(dispatcher); globalPlanner = new GlobalPlanner(systemConf, workerContext); dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler()); + dispatcher.register(QueryStopEvent.EventType.class, new QueryStopEventHandler()); } catch (Throwable t) { LOG.error(t.getMessage(), t); @@ -360,7 +362,7 @@ public class QueryMaster extends CompositeService implements EventHandler { return eventExecutor; } - public TajoAsyncDispatcher getDispatcher() { + public AsyncDispatcher getDispatcher() { return dispatcher; } @@ -491,6 +493,13 @@ public class QueryMaster extends CompositeService implements EventHandler { } } + private class QueryStopEventHandler implements EventHandler<QueryStopEvent> { + @Override + public void handle(QueryStopEvent event) { + queryMasterContext.stopQuery(event.getQueryId()); + } + } + class QueryHeartbeatThread extends Thread { public QueryHeartbeatThread() { super("QueryHeartbeatThread"); http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index 720d60a..9c789a5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; import org.apache.tajo.*; @@ -37,33 +38,31 @@ import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.plan.LogicalOptimizer; -import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.LogicalPlanner; -import org.apache.tajo.plan.logical.LogicalRootNode; -import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; -import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.engine.planner.global.MasterPlan; -import org.apache.tajo.plan.logical.LogicalNode; -import org.apache.tajo.plan.logical.NodeType; -import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.UnimplementedException; +import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.TajoAsyncDispatcher; import org.apache.tajo.master.TajoContainerProxy; import org.apache.tajo.master.event.*; import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.master.session.Session; +import org.apache.tajo.plan.LogicalOptimizer; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.LogicalRootNode; +import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.verifier.VerifyException; -import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.StorageProperty; import org.apache.tajo.storage.StorageUtil; -import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.util.metrics.TajoMetrics; import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter; import org.apache.tajo.worker.AbstractResourceAllocator; @@ -71,7 +70,6 @@ import org.apache.tajo.worker.TajoResourceAllocator; import java.io.IOException; import java.util.*; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -104,7 +102,7 @@ public class QueryMasterTask extends CompositeService { private String logicalPlanJson; - private TajoAsyncDispatcher dispatcher; + private AsyncDispatcher dispatcher; private final long querySubmitTime; @@ -154,7 +152,7 @@ public class QueryMasterTask extends CompositeService { } addService(resourceAllocator); - dispatcher = new TajoAsyncDispatcher(queryId.toString()); + dispatcher = new AsyncDispatcher(); addService(dispatcher); dispatcher.register(StageEventType.class, new StageEventDispatcher()); @@ -200,8 +198,6 @@ public class QueryMasterTask extends CompositeService { LOG.fatal(t.getMessage(), t); } - CallFuture future = new CallFuture(); - RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf()); NettyClientBase tmClient = null; try { @@ -225,21 +221,12 @@ public class QueryMasterTask extends CompositeService { tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), TajoMasterProtocol.class, true); } - - TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); - masterClientService.stopQueryMaster(null, queryId.getProto(), future); } catch (Exception e) { LOG.error(e.getMessage(), e); } finally { connPool.releaseConnection(tmClient); } - try { - future.get(3, TimeUnit.SECONDS); - } catch (Throwable t) { - LOG.warn(t); - } - super.stop(); //TODO change report to tajo master @@ -339,7 +326,8 @@ public class QueryMasterTask extends CompositeService { } } LOG.info("Query final state: " + query.getSynchronizedState()); - queryMasterContext.stopQuery(queryId); + + queryMasterContext.getEventHandler().handle(new QueryStopEvent(queryId)); } } @@ -620,7 +608,7 @@ public class QueryMasterTask extends CompositeService { return eventHandler; } - public TajoAsyncDispatcher getDispatcher() { + public AsyncDispatcher getDispatcher() { return dispatcher; } http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java index e421417..0515e72 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java @@ -736,16 +736,16 @@ public class Stage implements EventHandler<StageEvent> { stage.finalizeStats(); state = StageState.SUCCEEDED; } else { + ExecutionBlock parent = stage.getMasterPlan().getParent(stage.getBlock()); + DataChannel channel = stage.getMasterPlan().getChannel(stage.getId(), parent.getId()); + setShuffleIfNecessary(stage, channel); + initTaskScheduler(stage); // execute pre-processing asyncronously stage.getContext().getQueryMasterContext().getEventExecutor() .submit(new Runnable() { @Override public void run() { try { - ExecutionBlock parent = stage.getMasterPlan().getParent(stage.getBlock()); - DataChannel channel = stage.getMasterPlan().getChannel(stage.getId(), parent.getId()); - setShuffleIfNecessary(stage, channel); - initTaskScheduler(stage); schedule(stage); stage.totalScheduledObjectsCount = stage.getTaskScheduler().remainingScheduledObjectNum(); LOG.info(stage.totalScheduledObjectsCount + " objects are scheduled"); http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java index 875e450..1605560 100644 --- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java @@ -201,6 +201,12 @@ public class QueryTestCaseBase { client.close(); } + @Before + public void printTestName() { + /* protect a travis stalled build */ + System.out.println("Run: " + name.getMethodName()); + } + public QueryTestCaseBase() { // hive 0.12 does not support quoted identifier. // So, we use lower case database names when Tajo uses HCatalogStore. http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 841be45..0d2f6fa 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -164,8 +164,6 @@ public class TajoTestingCluster { if (!StringUtils.isEmpty(LOG_LEVEL)) { Level defaultLevel = Logger.getRootLogger().getLevel(); Logger.getLogger("org.apache.tajo").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel)); - Logger.getLogger("org.apache.tajo.master.TajoAsyncDispatcher").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), - defaultLevel)); Logger.getLogger("org.apache.hadoop").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel)); Logger.getLogger("org.apache.zookeeper").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel)); Logger.getLogger("BlockStateChange").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel)); @@ -630,8 +628,10 @@ public class TajoTestingCluster { this.clusterTestBuildDir = null; } - hbaseUtil.stopZooKeeperCluster(); - hbaseUtil.stopHBaseCluster(); + if(hbaseUtil != null) { + hbaseUtil.stopZooKeeperCluster(); + hbaseUtil.stopHBaseCluster(); + } LOG.info("Minicluster is down"); } http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java b/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java index 0f713e5..055dd02 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java @@ -22,10 +22,10 @@ import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.benchmark.TPCH; -import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.catalog.Schema; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.KeyValueSet; import java.io.File; import java.io.IOException; @@ -73,11 +73,7 @@ public class TpchTestBase { tables = new String[names.length][]; File file; for (int i = 0; i < names.length; i++) { - file = new File("src/test/tpch/" + names[i] + ".tbl"); - if(!file.exists()) { - file = new File(System.getProperty("user.dir") + "/tajo-core/src/test/tpch/" + names[i] - + ".tbl"); - } + file = TPCH.getDataFile(names[i]); tables[i] = FileUtil.readTextFile(file).split("\n"); paths[i] = file.getAbsolutePath(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java index b14bfa9..aff1677 100644 --- a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java +++ b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java @@ -321,14 +321,14 @@ public class TestTajoCli { setVar(tajoCli, SessionVars.CLI_FORMATTER_CLASS, TajoCliOutputTestFormatter.class.getName()); ByteArrayOutputStream out = new ByteArrayOutputStream(); - tajoCli = new TajoCli(tajoConf, new String[]{}, System.in, out); + TajoCli tajoCli = new TajoCli(tajoConf, new String[]{}, System.in, out); tajoCli.executeMetaCommand("\\admin -showmasters"); String consoleResult = new String(out.toByteArray()); String masterAddress = tajoCli.getContext().getConf().getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS); String host = masterAddress.split(":")[0]; - + tajoCli.close(); assertEquals(consoleResult, host + "\n"); } http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java index 99baeba..1c763e2 100644 --- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java +++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java @@ -26,7 +26,6 @@ import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.client.QueryClient; -import org.apache.tajo.conf.TajoConf; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -34,11 +33,13 @@ import org.junit.experimental.categories.Category; import java.net.InetSocketAddress; import java.sql.*; -import java.util.*; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.junit.Assert.*; -import static org.junit.Assert.assertEquals; @Category(IntegrationTest.class) public class TestTajoJdbc extends QueryTestCaseBase { @@ -113,6 +114,9 @@ public class TestTajoJdbc extends QueryTestCaseBase { if (stmt != null) { stmt.close(); } + if (conn != null) { + conn.close(); + } } } @@ -194,6 +198,9 @@ public class TestTajoJdbc extends QueryTestCaseBase { if (stmt != null) { stmt.close(); } + if (conn != null) { + conn.close(); + } } } @@ -494,11 +501,11 @@ public class TestTajoJdbc extends QueryTestCaseBase { int result; Statement stmt = null; ResultSet res = null; - + Connection conn = null; try { String connUri = buildConnectionUri(tajoMasterAddress.getHostName(), tajoMasterAddress.getPort(), DEFAULT_DATABASE_NAME); - Connection conn = DriverManager.getConnection(connUri); + conn = DriverManager.getConnection(connUri); assertTrue(conn.isValid(100)); stmt = conn.createStatement(); @@ -532,6 +539,10 @@ public class TestTajoJdbc extends QueryTestCaseBase { if (stmt != null) { stmt.close(); } + + if(conn != null) { + conn.close(); + } } } @@ -539,11 +550,11 @@ public class TestTajoJdbc extends QueryTestCaseBase { public void testSortWithDateTime() throws Exception { Statement stmt = null; ResultSet res = null; + Connection conn = null; int result; // skip this test if catalog uses HCatalogStore. // It is because HCatalogStore does not support Time data type. - try { if (!testingCluster.isHCatalogStoreRunning()) { executeDDL("create_table_with_date_ddl.sql", "table1"); @@ -551,7 +562,7 @@ public class TestTajoJdbc extends QueryTestCaseBase { String connUri = buildConnectionUri(tajoMasterAddress.getHostName(), tajoMasterAddress.getPort(), "TestTajoJdbc"); - Connection conn = DriverManager.getConnection(connUri); + conn = DriverManager.getConnection(connUri); assertTrue(conn.isValid(100)); stmt = conn.createStatement(); @@ -576,6 +587,10 @@ public class TestTajoJdbc extends QueryTestCaseBase { if (stmt != null) { stmt.close(); } + + if(conn != null) { + conn.close(); + } } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java index c1f4178..8ca4cff 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java @@ -20,34 +20,51 @@ package org.apache.tajo.master.querymaster; import org.apache.tajo.*; import org.apache.tajo.algebra.Expr; +import org.apache.tajo.benchmark.TPCH; import org.apache.tajo.catalog.CatalogService; +import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.parser.SQLAnalyzer; -import org.apache.tajo.plan.LogicalOptimizer; -import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.LogicalPlanner; import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.master.event.QueryEvent; import org.apache.tajo.master.event.QueryEventType; import org.apache.tajo.master.session.Session; +import org.apache.tajo.plan.LogicalOptimizer; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import org.junit.experimental.categories.Category; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import java.io.File; +import java.io.IOException; + +import static org.junit.Assert.*; -@Category(IntegrationTest.class) public class TestKillQuery { private static TajoTestingCluster cluster; private static TajoConf conf; + private static TajoClient client; @BeforeClass public static void setUp() throws Exception { - cluster = TpchTestBase.getInstance().getTestingCluster(); + cluster = new TajoTestingCluster(); + cluster.startMiniClusterInLocal(1); conf = cluster.getConfiguration(); + client = new TajoClientImpl(cluster.getConfiguration()); + File file = TPCH.getDataFile("lineitem"); + client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) " + + "using text location 'file://" + file.getAbsolutePath() + "'"); + assertTrue(client.existTable("default.lineitem")); + } + + @AfterClass + public static void tearDown() throws IOException { + if (client != null) client.close(); + if (cluster != null) cluster.shutdownMiniCluster(); } @Test @@ -56,7 +73,7 @@ public class TestKillQuery { QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf); Session session = LocalTajoTestingUtility.createDummySession(); CatalogService catalog = cluster.getMaster().getCatalog(); - String query = "select l_orderkey from lineitem group by l_orderkey"; + String query = "select l_orderkey, l_partkey from lineitem group by l_orderkey, l_partkey order by l_orderkey"; LogicalPlanner planner = new LogicalPlanner(catalog); LogicalOptimizer optimizer = new LogicalOptimizer(conf); @@ -99,7 +116,7 @@ public class TestKillQuery { q.handle(new QueryEvent(queryId, QueryEventType.KILL)); try{ - cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 10); + cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50); } finally { assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java index 18764c2..acd6b71 100644 --- a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java +++ b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java @@ -18,7 +18,10 @@ package org.apache.tajo.scheduler; -import org.apache.tajo.*; +import org.apache.tajo.QueryId; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.benchmark.TPCH; import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.client.TajoClientUtil; @@ -27,47 +30,52 @@ import org.apache.tajo.ipc.ClientProtos; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import org.junit.experimental.categories.Category; +import java.io.File; import java.sql.ResultSet; import static org.junit.Assert.*; -@Category(IntegrationTest.class) public class TestFifoScheduler { private static TajoTestingCluster cluster; private static TajoConf conf; private static TajoClient client; + private static String query = + "select l_orderkey, l_partkey from lineitem group by l_orderkey, l_partkey order by l_orderkey"; @BeforeClass public static void setUp() throws Exception { - cluster = TpchTestBase.getInstance().getTestingCluster(); + cluster = new TajoTestingCluster(); + cluster.startMiniClusterInLocal(1); conf = cluster.getConfiguration(); - client = new TajoClientImpl(conf); + client = new TajoClientImpl(cluster.getConfiguration()); + File file = TPCH.getDataFile("lineitem"); + client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) " + + "using text location 'file://" + file.getAbsolutePath() + "'"); + assertTrue(client.existTable("default.lineitem")); } @AfterClass public static void tearDown() throws Exception { - client.close(); + if (client != null) client.close(); + if (cluster != null) cluster.shutdownMiniCluster(); } @Test public final void testKillScheduledQuery() throws Exception { - ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(1) from lineitem"); - ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select sleep(1) from lineitem"); + ClientProtos.SubmitQueryResponse res = client.executeQuery(query); + ClientProtos.SubmitQueryResponse res2 = client.executeQuery(query); QueryId queryId = new QueryId(res.getQueryId()); QueryId queryId2 = new QueryId(res2.getQueryId()); cluster.waitForQueryRunning(queryId); client.killQuery(queryId2); assertEquals(TajoProtos.QueryState.QUERY_KILLED, client.getQueryStatus(queryId2).getState()); - - client.killQuery(queryId); // cleanup } @Test public final void testForwardedQuery() throws Exception { - ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(1) from lineitem"); + ClientProtos.SubmitQueryResponse res = client.executeQuery(query); ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select * from lineitem limit 1"); assertTrue(res.getIsForwarded()); assertFalse(res2.getIsForwarded()); @@ -79,16 +87,14 @@ public class TestFifoScheduler { assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId2).getState()); ResultSet resSet = TajoClientUtil.createResultSet(conf, client, res2); assertNotNull(resSet); - - client.killQuery(queryId); //cleanup } @Test public final void testScheduledQuery() throws Exception { ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(1) from lineitem"); - ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select sleep(1) from lineitem"); - ClientProtos.SubmitQueryResponse res3 = client.executeQuery("select sleep(1) from lineitem"); - ClientProtos.SubmitQueryResponse res4 = client.executeQuery("select sleep(1) from lineitem"); + ClientProtos.SubmitQueryResponse res2 = client.executeQuery(query); + ClientProtos.SubmitQueryResponse res3 = client.executeQuery(query); + ClientProtos.SubmitQueryResponse res4 = client.executeQuery(query); QueryId queryId = new QueryId(res.getQueryId()); QueryId queryId2 = new QueryId(res2.getQueryId()); @@ -103,9 +109,8 @@ public class TestFifoScheduler { assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId3).getState()); assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId4).getState()); - client.killQuery(queryId2); - client.killQuery(queryId3); client.killQuery(queryId4); - client.killQuery(queryId); + client.killQuery(queryId3); + client.killQuery(queryId2); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/6582d865/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java index d320077..77aa1d4 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java @@ -20,19 +20,20 @@ package org.apache.tajo.worker; import com.google.protobuf.ServiceException; import org.apache.hadoop.service.Service; -import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TajoProtos; import org.apache.tajo.TajoTestingCluster; -import org.apache.tajo.TpchTestBase; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.benchmark.TPCH; import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.master.querymaster.QueryInfo; -import org.junit.After; -import org.junit.Before; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; +import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.Map; @@ -40,29 +41,34 @@ import java.util.Map; import static org.junit.Assert.*; public class TestHistory { - private TajoTestingCluster cluster; - private TajoMaster master; - private TajoConf conf; - private TajoClient client; - - @Before - public void setUp() throws Exception { - cluster = TpchTestBase.getInstance().getTestingCluster(); + private static TajoTestingCluster cluster; + private static TajoMaster master; + private static TajoConf conf; + private static TajoClient client; + + @BeforeClass + public static void setUp() throws Exception { + cluster = new TajoTestingCluster(); + cluster.startMiniClusterInLocal(1); master = cluster.getMaster(); conf = cluster.getConfiguration(); - client = new TajoClientImpl(conf); + client = new TajoClientImpl(cluster.getConfiguration()); + File file = TPCH.getDataFile("lineitem"); + client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) " + + "using text location 'file://" + file.getAbsolutePath() + "'"); + assertTrue(client.existTable("default.lineitem")); } - @After - public void tearDown() { - client.close(); + @AfterClass + public static void tearDown() throws IOException { + if (client != null) client.close(); + if (cluster != null) cluster.shutdownMiniCluster(); } - @Test public final void testTaskRunnerHistory() throws IOException, ServiceException, InterruptedException { int beforeFinishedQueriesCount = master.getContext().getQueryJobManager().getFinishedQueries().size(); - client.executeQueryAndGetResult("select sleep(1) from lineitem"); + client.executeQueryAndGetResult("select count(*) from lineitem"); Collection<QueryInfo> finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries(); assertTrue(finishedQueries.size() > beforeFinishedQueriesCount); @@ -89,7 +95,7 @@ public class TestHistory { @Test public final void testTaskHistory() throws IOException, ServiceException, InterruptedException { int beforeFinishedQueriesCount = master.getContext().getQueryJobManager().getFinishedQueries().size(); - client.executeQueryAndGetResult("select sleep(1) from lineitem"); + client.executeQueryAndGetResult("select count(*) from lineitem"); Collection<QueryInfo> finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries(); assertTrue(finishedQueries.size() > beforeFinishedQueriesCount);
