Repository: lens
Updated Branches:
  refs/heads/master 3e7a6f602 -> 9ace50b3e


LENS-1169: Improve the logic of stopping query service


Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/9ace50b3
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/9ace50b3
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/9ace50b3

Branch: refs/heads/master
Commit: 9ace50b3e657922de0430df5e1a44d0571ebd699
Parents: 3e7a6f6
Author: Rajat Khandelwal <pro...@apache.org>
Authored: Wed Jun 8 19:05:05 2016 +0530
Committer: Rajat Khandelwal <rajatgupt...@gmail.com>
Committed: Wed Jun 8 19:05:05 2016 +0530

----------------------------------------------------------------------
 .../server/query/QueryExecutionServiceImpl.java | 72 ++++++++++++++------
 .../lens/server/query/QueryResultPurger.java    | 10 ++-
 .../server/query/TestQueryResultPurger.java     |  2 +-
 3 files changed, 59 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lens/blob/9ace50b3/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
index 8fe02aa..23c7743 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
@@ -1223,6 +1223,22 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     }
   }
 
+  private void awaitTermination(ExecutorService service) {
+    try {
+      service.awaitTermination(1, TimeUnit.MINUTES);
+    } catch (InterruptedException e) {
+      log.info("Couldn't finish executor service within 1 minute: {}", 
service);
+    }
+  }
+
+  private void awaitTermination(QueryResultPurger service) {
+    try {
+      service.awaitTermination(1, TimeUnit.MINUTES);
+    } catch (InterruptedException e) {
+      log.info("Couldn't finish query result purger within 1 minute: {}", 
service);
+    }
+  }
+
   /*
    * (non-Javadoc)
    *
@@ -1230,23 +1246,29 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
    */
   public void prepareStopping() {
     super.prepareStopping();
-    querySubmitter.interrupt();
-    statusPoller.interrupt();
-    queryPurger.interrupt();
-    prepareQueryPurger.interrupt();
-  }
+    Thread[] threadsToStop = new Thread[]{querySubmitter, statusPoller, 
queryPurger, prepareQueryPurger};
+    // Nudge the threads to stop
+    for (Thread th : threadsToStop) {
+      th.interrupt();
+    }
 
-  /*
-   * (non-Javadoc)
-   *
-   * @see org.apache.hive.service.CompositeService#stop()
-   */
-  public synchronized void stop() {
-    super.stop();
+    // Nudge executor pools to stop
 
-    waitingQueriesSelectionSvc.shutdown();
+    // Hard shutdown, since it doesn't matter whether waiting queries were 
selected, all will be
+    // selected in the next restart
+    waitingQueriesSelectionSvc.shutdownNow();
+    // Soft shutdown, Wait for current estimate tasks
+    estimatePool.shutdown();
+    // Soft shutdown for result purger too. Purging shouldn't take much time.
+    if (null != queryResultPurger) {
+      queryResultPurger.shutdown();
+    }
+    // Soft shutdown right now, will await termination in this method itself, 
since cancellation pool
+    // should be terminated before query state gets persisted.
+    queryCancellationPool.shutdown();
 
-    for (Thread th : new Thread[]{querySubmitter, statusPoller, queryPurger, 
prepareQueryPurger}) {
+    // Join the threads.
+    for (Thread th : threadsToStop) {
       try {
         log.debug("Waiting for {}", th.getName());
         th.join();
@@ -1254,15 +1276,21 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
         log.error("Error waiting for thread: {}", th.getName(), e);
       }
     }
+    // Needs to be done before queries' states are persisted, hence doing 
here. Await of other
+    // executor services can be done after persistence, hence they are done in 
#stop
+    awaitTermination(queryCancellationPool);
+  }
 
-    estimatePool.shutdownNow();
-
-    if (null != queryResultPurger) {
-      queryResultPurger.stop();
-    }
-
-    queryCancellationPool.shutdown();
-
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.hive.service.CompositeService#stop()
+   */
+  public synchronized void stop() {
+    super.stop();
+    awaitTermination(waitingQueriesSelectionSvc);
+    awaitTermination(estimatePool);
+    awaitTermination(queryResultPurger);
     log.info("Query execution service stopped");
   }
 

http://git-wip-us.apache.org/repos/asf/lens/blob/9ace50b3/lens-server/src/main/java/org/apache/lens/server/query/QueryResultPurger.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryResultPurger.java 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryResultPurger.java
index 2be11ea..756c282 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryResultPurger.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryResultPurger.java
@@ -145,9 +145,9 @@ public class QueryResultPurger implements Runnable {
   /**
    * Stops query result purger
    */
-  public void stop() {
+  public void shutdown() {
     if (null != queryResultPurgerExecutor) {
-      queryResultPurgerExecutor.shutdownNow();
+      queryResultPurgerExecutor.shutdown();
       log.info("Stopped query result purger.");
     }
   }
@@ -174,4 +174,10 @@ public class QueryResultPurger implements Runnable {
     }
     return metricsService;
   }
+
+  public void awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+    if (null != queryResultPurgerExecutor) {
+      queryResultPurgerExecutor.awaitTermination(timeout, unit);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/9ace50b3/lens-server/src/test/java/org/apache/lens/server/query/TestQueryResultPurger.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryResultPurger.java
 
b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryResultPurger.java
index 9aeb645..c498ada 100644
--- 
a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryResultPurger.java
+++ 
b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryResultPurger.java
@@ -74,7 +74,7 @@ public class TestQueryResultPurger {
     QueryResultPurger queryResultPurger = new QueryResultPurger();
     queryResultPurger.init(conf);
     Thread.sleep(2000); // sleep for 2 seconds, enough to run query purger
-    queryResultPurger.stop();
+    queryResultPurger.shutdown();
     verify(conf.get(LensConfConstants.RESULT_SET_PARENT_DIR), 1);
     verify(conf.get(LensConfConstants.RESULT_SET_PARENT_DIR) + "/" + 
conf.get(LensConfConstants.QUERY_HDFS_OUTPUT_PATH),
       0);

Reply via email to