DRILL-5902: Queries encounter random failure due to RPC connection timed out

close apache/drill#1113


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b4d2a770
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b4d2a770
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b4d2a770

Branch: refs/heads/master
Commit: b4d2a77038421b243970da71655311076c0a5a43
Parents: 4f203ea
Author: Vlad Rozov <vro...@apache.org>
Authored: Mon Feb 5 19:15:56 2018 -0800
Committer: Aman Sinha <asi...@maprtech.com>
Committed: Fri Feb 23 14:15:00 2018 -0800

----------------------------------------------------------------------
 .../server/rest/profile/ProfileResources.java   |  5 +--
 .../org/apache/drill/exec/work/WorkManager.java | 39 ++++++++++++++++++++
 .../exec/work/batch/ControlMessageHandler.java  |  4 +-
 .../exec/work/foreman/QueryStateProcessor.java  | 14 +++----
 .../apache/drill/exec/work/user/UserWorker.java |  5 +--
 5 files changed, 48 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/b4d2a770/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
index 8751ee6..ec06f0e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
@@ -383,10 +383,7 @@ public class ProfileResources {
     QueryId id = QueryIdHelper.getQueryIdFromString(queryId);
 
     // first check local running
-    Foreman f = work.getBee().getForemanForQueryId(id);
-    if(f != null){
-      
checkOrThrowQueryCancelAuthorization(f.getQueryContext().getQueryUserName(), 
queryId);
-      f.cancel();
+    if (work.getBee().cancelForeman(id, principal)) {
       return String.format("Cancelled query %s on locally running node.", 
queryId);
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/b4d2a770/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index d75668c..7058c62 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.drill.common.SelfCleaningRunnable;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.metrics.DrillMetrics;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
@@ -37,6 +38,7 @@ import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.rpc.data.DataConnectionCreator;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.rest.auth.DrillUserPrincipal;
 import org.apache.drill.exec.store.sys.PersistentStoreProvider;
 import org.apache.drill.exec.work.batch.ControlMessageHandler;
 import org.apache.drill.exec.work.foreman.Foreman;
@@ -259,6 +261,43 @@ public class WorkManager implements AutoCloseable {
       executor.execute(runnable);
     }
 
+    public boolean cancelForeman(final QueryId queryId, DrillUserPrincipal 
principal) {
+      Preconditions.checkNotNull(queryId);
+
+      final Foreman foreman = queries.get(queryId);
+      if (foreman == null) {
+        return false;
+      }
+
+      final String queryIdString = QueryIdHelper.getQueryId(queryId);
+
+      if (principal != null && 
!principal.canManageQueryOf(foreman.getQueryContext().getQueryUserName())) {
+        throw UserException.permissionError()
+            .message("Not authorized to cancel the query '%s'", queryIdString)
+            .build(logger);
+      }
+
+      executor.execute(new Runnable()
+      {
+        @Override
+        public void run()
+        {
+          final Thread currentThread = Thread.currentThread();
+          final String originalName = currentThread.getName();
+          try {
+            currentThread.setName(queryIdString + ":foreman:cancel");
+            logger.debug("Canceling foreman");
+            foreman.cancel();
+          } catch (Throwable t) {
+            logger.warn("Exception while canceling foreman", t);
+          } finally {
+            currentThread.setName(originalName);
+          }
+        }
+      });
+      return true;
+    }
+
     /**
      * Remove the given Foreman from the running query list.
      *

http://git-wip-us.apache.org/repos/asf/drill/blob/b4d2a770/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index 7865b53..e562b16 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -98,9 +98,7 @@ public class ControlMessageHandler implements 
RequestHandler<ControlConnection>
 
     case RpcType.REQ_QUERY_CANCEL_VALUE: {
       final QueryId queryId = get(pBody, QueryId.PARSER);
-      final Foreman foreman = bee.getForemanForQueryId(queryId);
-      if (foreman != null) {
-        foreman.cancel();
+      if (bee.cancelForeman(queryId, null)) {
         sender.send(ControlRpcConfig.OK);
       } else {
         sender.send(ControlRpcConfig.FAIL);

http://git-wip-us.apache.org/repos/asf/drill/blob/b4d2a770/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java
index 2443139..0ef4f37 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java
@@ -115,8 +115,9 @@ public class QueryStateProcessor implements AutoCloseable {
   }
 
   /**
-   * Cancel the query. Asynchronous -- it may take some time for all remote 
fragments to be terminated.
-   * For preparing, planning and enqueued states we cancel immediately since 
these states are done locally.
+   * Transition query to {@link QueryState#CANCELLATION_REQUESTED 
CANCELLATION_REQUESTED} state if it is
+   * not already in the terminal states {@link QueryState#CANCELED, CANCELED}, 
{@link QueryState#COMPLETED, COMPLETED} or
+   * {@link QueryState#FAILED, FAILED}. See the implementation of {@link 
#moveToState(QueryState, Exception)} for details.
    *
    * Note this can be called from outside of run() on another thread, or after 
run() completes
    */
@@ -125,20 +126,17 @@ public class QueryStateProcessor implements AutoCloseable 
{
       case PREPARING:
       case PLANNING:
       case ENQUEUED:
-        moveToState(QueryState.CANCELLATION_REQUESTED, null);
-        return;
-
       case STARTING:
       case RUNNING:
-        addToEventQueue(QueryState.CANCELLATION_REQUESTED, null);
-        return;
+        moveToState(QueryState.CANCELLATION_REQUESTED, null);
+        break;
 
       case CANCELLATION_REQUESTED:
       case CANCELED:
       case COMPLETED:
       case FAILED:
         // nothing to do
-        return;
+        break;
 
       default:
         throw new IllegalStateException("Unable to cancel the query. 
Unexpected query state -> " + state);

http://git-wip-us.apache.org/repos/asf/drill/blob/b4d2a770/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
index 04135dc..b105b78 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
@@ -81,10 +81,7 @@ public class UserWorker{
   }
 
   public Ack cancelQuery(QueryId query) {
-    Foreman foreman = bee.getForemanForQueryId(query);
-    if(foreman != null) {
-      foreman.cancel();
-    }
+    bee.cancelForeman(query, null);
     return Acks.OK;
   }
 

Reply via email to