DRILL-5902: Queries encounter random failure due to RPC connection timed out
close apache/drill#1113 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b4d2a770 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b4d2a770 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b4d2a770 Branch: refs/heads/master Commit: b4d2a77038421b243970da71655311076c0a5a43 Parents: 4f203ea Author: Vlad Rozov <[email protected]> Authored: Mon Feb 5 19:15:56 2018 -0800 Committer: Aman Sinha <[email protected]> Committed: Fri Feb 23 14:15:00 2018 -0800 ---------------------------------------------------------------------- .../server/rest/profile/ProfileResources.java | 5 +-- .../org/apache/drill/exec/work/WorkManager.java | 39 ++++++++++++++++++++ .../exec/work/batch/ControlMessageHandler.java | 4 +- .../exec/work/foreman/QueryStateProcessor.java | 14 +++---- .../apache/drill/exec/work/user/UserWorker.java | 5 +-- 5 files changed, 48 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/b4d2a770/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java index 8751ee6..ec06f0e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java @@ -383,10 +383,7 @@ public class ProfileResources { QueryId id = QueryIdHelper.getQueryIdFromString(queryId); // first check local running - Foreman f = work.getBee().getForemanForQueryId(id); - if(f != null){ - checkOrThrowQueryCancelAuthorization(f.getQueryContext().getQueryUserName(), queryId); - f.cancel(); + if (work.getBee().cancelForeman(id, principal)) { return String.format("Cancelled query %s on locally running node.", queryId); } http://git-wip-us.apache.org/repos/asf/drill/blob/b4d2a770/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java index d75668c..7058c62 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.drill.common.SelfCleaningRunnable; +import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.metrics.DrillMetrics; import org.apache.drill.exec.proto.BitControl.FragmentStatus; @@ -37,6 +38,7 @@ import org.apache.drill.exec.rpc.control.WorkEventBus; import org.apache.drill.exec.rpc.data.DataConnectionCreator; import org.apache.drill.exec.server.BootStrapContext; import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.rest.auth.DrillUserPrincipal; import org.apache.drill.exec.store.sys.PersistentStoreProvider; import org.apache.drill.exec.work.batch.ControlMessageHandler; import org.apache.drill.exec.work.foreman.Foreman; @@ -259,6 +261,43 @@ public class WorkManager implements AutoCloseable { executor.execute(runnable); } + public boolean cancelForeman(final QueryId queryId, DrillUserPrincipal principal) { + Preconditions.checkNotNull(queryId); + + final Foreman foreman = queries.get(queryId); + if (foreman == null) { + return false; + } + + final String queryIdString = QueryIdHelper.getQueryId(queryId); + + if (principal != null && !principal.canManageQueryOf(foreman.getQueryContext().getQueryUserName())) { + throw UserException.permissionError() + .message("Not authorized to cancel the query '%s'", queryIdString) + .build(logger); + } + + executor.execute(new Runnable() + { + @Override + public void run() + { + final Thread currentThread = Thread.currentThread(); + final String originalName = currentThread.getName(); + try { + currentThread.setName(queryIdString + ":foreman:cancel"); + logger.debug("Canceling foreman"); + foreman.cancel(); + } catch (Throwable t) { + logger.warn("Exception while canceling foreman", t); + } finally { + currentThread.setName(originalName); + } + } + }); + return true; + } + /** * Remove the given Foreman from the running query list. * http://git-wip-us.apache.org/repos/asf/drill/blob/b4d2a770/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java index 7865b53..e562b16 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java @@ -98,9 +98,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection> case RpcType.REQ_QUERY_CANCEL_VALUE: { final QueryId queryId = get(pBody, QueryId.PARSER); - final Foreman foreman = bee.getForemanForQueryId(queryId); - if (foreman != null) { - foreman.cancel(); + if (bee.cancelForeman(queryId, null)) { sender.send(ControlRpcConfig.OK); } else { sender.send(ControlRpcConfig.FAIL); http://git-wip-us.apache.org/repos/asf/drill/blob/b4d2a770/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java index 2443139..0ef4f37 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java @@ -115,8 +115,9 @@ public class QueryStateProcessor implements AutoCloseable { } /** - * Cancel the query. Asynchronous -- it may take some time for all remote fragments to be terminated. - * For preparing, planning and enqueued states we cancel immediately since these states are done locally. + * Transition query to {@link QueryState#CANCELLATION_REQUESTED CANCELLATION_REQUESTED} state if it is + * not already in the terminal states {@link QueryState#CANCELED, CANCELED}, {@link QueryState#COMPLETED, COMPLETED} or + * {@link QueryState#FAILED, FAILED}. See the implementation of {@link #moveToState(QueryState, Exception)} for details. * * Note this can be called from outside of run() on another thread, or after run() completes */ @@ -125,20 +126,17 @@ public class QueryStateProcessor implements AutoCloseable { case PREPARING: case PLANNING: case ENQUEUED: - moveToState(QueryState.CANCELLATION_REQUESTED, null); - return; - case STARTING: case RUNNING: - addToEventQueue(QueryState.CANCELLATION_REQUESTED, null); - return; + moveToState(QueryState.CANCELLATION_REQUESTED, null); + break; case CANCELLATION_REQUESTED: case CANCELED: case COMPLETED: case FAILED: // nothing to do - return; + break; default: throw new IllegalStateException("Unable to cancel the query. Unexpected query state -> " + state); http://git-wip-us.apache.org/repos/asf/drill/blob/b4d2a770/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java index 04135dc..b105b78 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java @@ -81,10 +81,7 @@ public class UserWorker{ } public Ack cancelQuery(QueryId query) { - Foreman foreman = bee.getForemanForQueryId(query); - if(foreman != null) { - foreman.cancel(); - } + bee.cancelForeman(query, null); return Acks.OK; }
