Repository: lens Updated Branches: refs/heads/master 3e7a6f602 -> 9ace50b3e
LENS-1169: Improve the logic of stopping query service Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/9ace50b3 Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/9ace50b3 Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/9ace50b3 Branch: refs/heads/master Commit: 9ace50b3e657922de0430df5e1a44d0571ebd699 Parents: 3e7a6f6 Author: Rajat Khandelwal <pro...@apache.org> Authored: Wed Jun 8 19:05:05 2016 +0530 Committer: Rajat Khandelwal <rajatgupt...@gmail.com> Committed: Wed Jun 8 19:05:05 2016 +0530 ---------------------------------------------------------------------- .../server/query/QueryExecutionServiceImpl.java | 72 ++++++++++++++------ .../lens/server/query/QueryResultPurger.java | 10 ++- .../server/query/TestQueryResultPurger.java | 2 +- 3 files changed, 59 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/9ace50b3/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java index 8fe02aa..23c7743 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java @@ -1223,6 +1223,22 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE } } + private void awaitTermination(ExecutorService service) { + try { + service.awaitTermination(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + log.info("Couldn't finish executor service within 1 minute: {}", service); + } + } + + private void awaitTermination(QueryResultPurger service) { + try { + service.awaitTermination(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + log.info("Couldn't finish query result purger within 1 minute: {}", service); + } + } + /* * (non-Javadoc) * @@ -1230,23 +1246,29 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE */ public void prepareStopping() { super.prepareStopping(); - querySubmitter.interrupt(); - statusPoller.interrupt(); - queryPurger.interrupt(); - prepareQueryPurger.interrupt(); - } + Thread[] threadsToStop = new Thread[]{querySubmitter, statusPoller, queryPurger, prepareQueryPurger}; + // Nudge the threads to stop + for (Thread th : threadsToStop) { + th.interrupt(); + } - /* - * (non-Javadoc) - * - * @see org.apache.hive.service.CompositeService#stop() - */ - public synchronized void stop() { - super.stop(); + // Nudge executor pools to stop - waitingQueriesSelectionSvc.shutdown(); + // Hard shutdown, since it doesn't matter whether waiting queries were selected, all will be + // selected in the next restart + waitingQueriesSelectionSvc.shutdownNow(); + // Soft shutdown, Wait for current estimate tasks + estimatePool.shutdown(); + // Soft shutdown for result purger too. Purging shouldn't take much time. + if (null != queryResultPurger) { + queryResultPurger.shutdown(); + } + // Soft shutdown right now, will await termination in this method itself, since cancellation pool + // should be terminated before query state gets persisted. + queryCancellationPool.shutdown(); - for (Thread th : new Thread[]{querySubmitter, statusPoller, queryPurger, prepareQueryPurger}) { + // Join the threads. + for (Thread th : threadsToStop) { try { log.debug("Waiting for {}", th.getName()); th.join(); @@ -1254,15 +1276,21 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE log.error("Error waiting for thread: {}", th.getName(), e); } } + // Needs to be done before queries' states are persisted, hence doing here. Await of other + // executor services can be done after persistence, hence they are done in #stop + awaitTermination(queryCancellationPool); + } - estimatePool.shutdownNow(); - - if (null != queryResultPurger) { - queryResultPurger.stop(); - } - - queryCancellationPool.shutdown(); - + /* + * (non-Javadoc) + * + * @see org.apache.hive.service.CompositeService#stop() + */ + public synchronized void stop() { + super.stop(); + awaitTermination(waitingQueriesSelectionSvc); + awaitTermination(estimatePool); + awaitTermination(queryResultPurger); log.info("Query execution service stopped"); } http://git-wip-us.apache.org/repos/asf/lens/blob/9ace50b3/lens-server/src/main/java/org/apache/lens/server/query/QueryResultPurger.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryResultPurger.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryResultPurger.java index 2be11ea..756c282 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryResultPurger.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryResultPurger.java @@ -145,9 +145,9 @@ public class QueryResultPurger implements Runnable { /** * Stops query result purger */ - public void stop() { + public void shutdown() { if (null != queryResultPurgerExecutor) { - queryResultPurgerExecutor.shutdownNow(); + queryResultPurgerExecutor.shutdown(); log.info("Stopped query result purger."); } } @@ -174,4 +174,10 @@ public class QueryResultPurger implements Runnable { } return metricsService; } + + public void awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + if (null != queryResultPurgerExecutor) { + queryResultPurgerExecutor.awaitTermination(timeout, unit); + } + } } http://git-wip-us.apache.org/repos/asf/lens/blob/9ace50b3/lens-server/src/test/java/org/apache/lens/server/query/TestQueryResultPurger.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryResultPurger.java b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryResultPurger.java index 9aeb645..c498ada 100644 --- a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryResultPurger.java +++ b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryResultPurger.java @@ -74,7 +74,7 @@ public class TestQueryResultPurger { QueryResultPurger queryResultPurger = new QueryResultPurger(); queryResultPurger.init(conf); Thread.sleep(2000); // sleep for 2 seconds, enough to run query purger - queryResultPurger.stop(); + queryResultPurger.shutdown(); verify(conf.get(LensConfConstants.RESULT_SET_PARENT_DIR), 1); verify(conf.get(LensConfConstants.RESULT_SET_PARENT_DIR) + "/" + conf.get(LensConfConstants.QUERY_HDFS_OUTPUT_PATH), 0);