DRILL-3072: Update root fragment to not to modify the Foreman state directly.
Instead use RPC mechanism to send and receive status updates Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4dcb3e75 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4dcb3e75 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4dcb3e75 Branch: refs/heads/master Commit: 4dcb3e75643b71daa7f458e1824ac7eb7fc10cde Parents: e58a306 Author: vkorukanti <[email protected]> Authored: Wed May 13 19:42:24 2015 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu May 14 21:58:52 2015 -0700 ---------------------------------------------------------------------- .../apache/drill/exec/work/foreman/Foreman.java | 9 ++-- .../drill/exec/work/foreman/QueryManager.java | 47 +++++--------------- 2 files changed, 17 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/4dcb3e75/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 6840cf3..5d07b49 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -205,7 +205,7 @@ public class Foreman implements Runnable { // resume all pauses through query context queryContext.getExecutionControls().unpauseAll(); // resume all pauses through all fragment contexts - queryManager.unpauseExecutingFragments(drillbitContext, rootRunner); + queryManager.unpauseExecutingFragments(drillbitContext); } /** @@ -810,7 +810,7 @@ public class Foreman implements Runnable { assert exception == null; queryManager.markEndTime(); recordNewState(QueryState.CANCELLATION_REQUESTED); - queryManager.cancelExecutingFragments(drillbitContext, rootRunner); + queryManager.cancelExecutingFragments(drillbitContext); foremanResult.setCompleted(QueryState.CANCELED); /* * We don't close the foremanResult until we've gotten @@ -833,7 +833,7 @@ public class Foreman implements Runnable { assert exception != null; queryManager.markEndTime(); recordNewState(QueryState.FAILED); - queryManager.cancelExecutingFragments(drillbitContext, rootRunner); + queryManager.cancelExecutingFragments(drillbitContext); foremanResult.setFailed(exception); foremanResult.close(); return; @@ -934,7 +934,8 @@ public class Foreman implements Runnable { queryManager.addFragmentStatusTracker(rootFragment, true); - rootRunner = new FragmentExecutor(rootContext, rootFragment, queryManager.newRootStatusHandler(rootContext), + rootRunner = new FragmentExecutor(rootContext, rootFragment, + queryManager.newRootStatusHandler(rootContext, drillbitContext), rootOperator); final RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner); http://git-wip-us.apache.org/repos/asf/drill/blob/4dcb3e75/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java index eed4e17..71b77c6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java @@ -43,6 +43,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; import org.apache.drill.exec.proto.UserProtos.RunQuery; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.control.ControlTunnel; import org.apache.drill.exec.rpc.control.Controller; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.sys.PStore; @@ -53,6 +54,7 @@ import org.apache.drill.exec.work.WorkManager; import org.apache.drill.exec.work.foreman.Foreman.StateListener; import org.apache.drill.exec.work.fragment.AbstractStatusReporter; import org.apache.drill.exec.work.fragment.FragmentExecutor; +import org.apache.drill.exec.work.fragment.NonRootStatusReporter; import org.apache.drill.exec.work.fragment.StatusReporter; import com.carrotsearch.hppc.IntObjectOpenHashMap; @@ -176,17 +178,15 @@ public class QueryManager { /** * Stop all fragments with currently *known* active status (active as in SENDING, AWAITING_ALLOCATION, RUNNING). - * (1) Root fragment - * (a) If the root is pending, delegate the cancellation to local work bus. - * (b) If the root is running, cancel the fragment directly. * * For the actual cancel calls for intermediate and leaf fragments, see * {@link org.apache.drill.exec.work.batch.ControlMessageHandler#cancelFragment} + * (1) Root fragment: pending or running, send the cancel signal through a tunnel. * (2) Intermediate fragment: pending or running, send the cancel signal through a tunnel (for local and remote * fragments). The actual cancel is done by delegating the cancel to the work bus. * (3) Leaf fragment: running, send the cancel signal through a tunnel. The cancel is done directly. */ - void cancelExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) { + void cancelExecutingFragments(final DrillbitContext drillbitContext) { final Controller controller = drillbitContext.getController(); for(final FragmentData data : fragmentDataSet) { switch(data.getState()) { @@ -194,19 +194,10 @@ public class QueryManager { case AWAITING_ALLOCATION: case RUNNING: final FragmentHandle handle = data.getHandle(); - if (rootRunner.getContext().getHandle().equals(handle)) { - // Case 1.a: pending root is in the work bus. Delegate the cancel to the work bus. - final boolean removed = drillbitContext.getWorkBus().cancelAndRemoveFragmentManagerIfExists(handle); - // Case 1.b: running root. Cancel directly. - if (!removed) { - rootRunner.cancel(); - } - } else { - final DrillbitEndpoint endpoint = data.getEndpoint(); - // TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same? - controller.getTunnel(endpoint).cancelFragment(new SignalListener(endpoint, handle, + final DrillbitEndpoint endpoint = data.getEndpoint(); + // TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same? + controller.getTunnel(endpoint).cancelFragment(new SignalListener(endpoint, handle, SignalListener.Signal.CANCEL), handle); - } break; case FINISHED: @@ -221,13 +212,9 @@ public class QueryManager { /** * Sends a resume signal to all fragments, regardless of their state, since the fragment might have paused before - * sending any message. Resume the root fragment directly and all other (local and remote) fragments through the - * control tunnel. + * sending any message. Resume all fragments through the control tunnel. */ - void unpauseExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) { - if (rootRunner != null) { - rootRunner.unpause(); - } + void unpauseExecutingFragments(final DrillbitContext drillbitContext) { final Controller controller = drillbitContext.getController(); for(final FragmentData data : fragmentDataSet) { final DrillbitEndpoint endpoint = data.getEndpoint(); @@ -447,19 +434,9 @@ public class QueryManager { } } - public StatusReporter newRootStatusHandler(final FragmentContext context) { - return new RootStatusReporter(context); - } - - private class RootStatusReporter extends AbstractStatusReporter { - private RootStatusReporter(final FragmentContext context) { - super(context); - } - - @Override - protected void statusChange(final FragmentHandle handle, final FragmentStatus status) { - fragmentStatusListener.statusUpdate(status); - } + public StatusReporter newRootStatusHandler(final FragmentContext context, final DrillbitContext dContext) { + final ControlTunnel tunnel = dContext.getController().getTunnel(foreman.getQueryContext().getCurrentEndpoint()); + return new NonRootStatusReporter(context, tunnel); } public FragmentStatusListener getFragmentStatusListener(){
