Repository: drill Updated Branches: refs/heads/master 1de6aed93 -> 6796006f2
DRILL-3190: Check for transitions from CANCELLATION_REQUESTED state + Moved state transition checks to QueryManager Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d9452d97 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d9452d97 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d9452d97 Branch: refs/heads/master Commit: d9452d973e1bfd7f0aa2e6c28b3e8c72817628a3 Parents: 1de6aed Author: Sudheesh Katkam <[email protected]> Authored: Mon Jun 1 09:42:33 2015 -0700 Committer: vkorukanti <[email protected]> Committed: Fri Jun 5 07:47:09 2015 -0700 ---------------------------------------------------------------------- .../drill/exec/work/foreman/FragmentData.java | 28 +++++--------------- .../drill/exec/work/foreman/QueryManager.java | 25 ++++++++++++++--- 2 files changed, 27 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/d9452d97/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java index ceb77f0..9e07210 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java @@ -23,10 +23,9 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.UserBitShared.FragmentState; import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile; import org.apache.drill.exec.proto.UserBitShared.OperatorProfile; -import org.apache.drill.exec.proto.helper.QueryIdHelper; public class FragmentData { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentData.class); +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentData.class); private final boolean isLocal; private volatile FragmentStatus status; @@ -49,31 +48,16 @@ public class FragmentData { } /** - * Update the status for this fragment. Also records last update and last progress time. - * @param status Updated status - * @return Whether or not the status update resulted in a FragmentState change. + * Update the status for this fragment. Also records last update and last progress time. + * @param newStatus Updated status */ - public boolean setStatus(final FragmentStatus newStatus) { + public void setStatus(final FragmentStatus newStatus) { final long time = System.currentTimeMillis(); - final FragmentState oldState = status.getProfile().getState(); - final boolean inTerminalState = oldState == FragmentState.FAILED || oldState == FragmentState.FINISHED || oldState == FragmentState.CANCELLED; - final FragmentState currentState = newStatus.getProfile().getState(); - final boolean stateChanged = currentState != oldState; - - if (inTerminalState) { - // already in a terminal state. This shouldn't happen. - logger.warn(String.format("Received status message for fragment %s after fragment was in state %s. New state was %s", - QueryIdHelper.getQueryIdentifier(getHandle()), oldState, currentState)); - return false; - } - - this.lastStatusUpdate = time; + lastStatusUpdate = time; if (madeProgress(status, newStatus)) { - this.lastProgress = time; + lastProgress = time; } status = newStatus; - - return stateChanged; } public FragmentState getState() { http://git-wip-us.apache.org/repos/asf/drill/blob/d9452d97/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java index 71b77c6..a7dfe85 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java @@ -35,6 +35,7 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.SchemaUserBitShared; +import org.apache.drill.exec.proto.UserBitShared.FragmentState; import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryInfo; @@ -50,10 +51,7 @@ import org.apache.drill.exec.store.sys.PStore; import org.apache.drill.exec.store.sys.PStoreConfig; import org.apache.drill.exec.store.sys.PStoreProvider; import org.apache.drill.exec.work.EndpointListener; -import org.apache.drill.exec.work.WorkManager; import org.apache.drill.exec.work.foreman.Foreman.StateListener; -import org.apache.drill.exec.work.fragment.AbstractStatusReporter; -import org.apache.drill.exec.work.fragment.FragmentExecutor; import org.apache.drill.exec.work.fragment.NonRootStatusReporter; import org.apache.drill.exec.work.fragment.StatusReporter; @@ -126,12 +124,31 @@ public class QueryManager { } } + private static boolean isTerminal(final FragmentState state) { + return state == FragmentState.FAILED + || state == FragmentState.FINISHED + || state == FragmentState.CANCELLED; + } + private boolean updateFragmentStatus(final FragmentStatus fragmentStatus) { final FragmentHandle fragmentHandle = fragmentStatus.getHandle(); final int majorFragmentId = fragmentHandle.getMajorFragmentId(); final int minorFragmentId = fragmentHandle.getMinorFragmentId(); final FragmentData data = fragmentDataMap.get(majorFragmentId).get(minorFragmentId); - return data.setStatus(fragmentStatus); + + final FragmentState oldState = data.getState(); + final boolean inTerminalState = isTerminal(oldState); + final FragmentState currentState = fragmentStatus.getProfile().getState(); + + if (inTerminalState || (oldState == FragmentState.CANCELLATION_REQUESTED && !isTerminal(currentState))) { + // Already in a terminal state, or invalid state transition from CANCELLATION_REQUESTED. This shouldn't happen. + logger.warn(String.format("Received status message for fragment %s after fragment was in state %s. New state was %s", + QueryIdHelper.getQueryIdentifier(fragmentHandle), oldState, currentState)); + return false; + } + + data.setStatus(fragmentStatus); + return oldState != currentState; } private void fragmentDone(final FragmentStatus status) {
