Repository: drill Updated Branches: refs/heads/master 48d8a59d1 -> 2506ecf15
DRILL-3450: Moved methods from AbstractStatusReporter and NonRootStatusReporter to FragmentStatusReporter + Removed StatusReporter interface + Refactored FragmentStatusReporter Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/2506ecf1 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/2506ecf1 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/2506ecf1 Branch: refs/heads/master Commit: 2506ecf1551bdb9a7dc6dbf950ba2c5c565eb1f4 Parents: 48d8a59 Author: Sudheesh Katkam <[email protected]> Authored: Wed Jul 1 21:39:53 2015 -0700 Committer: Sudheesh Katkam <[email protected]> Committed: Tue Jul 7 10:40:53 2015 -0700 ---------------------------------------------------------------------- .../exec/work/batch/ControlMessageHandler.java | 6 +- .../apache/drill/exec/work/foreman/Foreman.java | 10 +- .../drill/exec/work/foreman/QueryManager.java | 9 -- .../work/fragment/AbstractStatusReporter.java | 94 --------------- .../exec/work/fragment/FragmentExecutor.java | 30 +++-- .../work/fragment/FragmentStatusReporter.java | 116 +++++++++++++++++++ .../work/fragment/NonRootFragmentManager.java | 4 +- .../work/fragment/NonRootStatusReporter.java | 44 ------- .../exec/work/fragment/StatusReporter.java | 30 ----- 9 files changed, 139 insertions(+), 204 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java index 9f302a2..1c0eb80 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java @@ -44,8 +44,8 @@ import org.apache.drill.exec.work.WorkManager.WorkerBee; import org.apache.drill.exec.work.foreman.Foreman; import org.apache.drill.exec.work.fragment.FragmentExecutor; import org.apache.drill.exec.work.fragment.FragmentManager; +import org.apache.drill.exec.work.fragment.FragmentStatusReporter; import org.apache.drill.exec.work.fragment.NonRootFragmentManager; -import org.apache.drill.exec.work.fragment.NonRootStatusReporter; public class ControlMessageHandler { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlMessageHandler.class); @@ -130,8 +130,8 @@ public class ControlMessageHandler { final FragmentContext context = new FragmentContext(drillbitContext, fragment, drillbitContext.getFunctionImplementationRegistry()); final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman()); - final NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel); - final FragmentExecutor fr = new FragmentExecutor(context, fragment, listener); + final FragmentStatusReporter statusReporter = new FragmentStatusReporter(context, tunnel); + final FragmentExecutor fr = new FragmentExecutor(context, fragment, statusReporter); bee.addFragmentRunner(fr); } else { // isIntermediate, store for incoming data. http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 716fb66..671deae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -69,6 +69,7 @@ import org.apache.drill.exec.proto.UserProtos.RunQuery; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.BaseRpcOutcomeListener; import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.control.ControlTunnel; import org.apache.drill.exec.rpc.control.Controller; import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; import org.apache.drill.exec.server.DrillbitContext; @@ -81,6 +82,7 @@ import org.apache.drill.exec.work.QueryWorkUnit; import org.apache.drill.exec.work.WorkManager.WorkerBee; import org.apache.drill.exec.work.batch.IncomingBuffers; import org.apache.drill.exec.work.fragment.FragmentExecutor; +import org.apache.drill.exec.work.fragment.FragmentStatusReporter; import org.apache.drill.exec.work.fragment.RootFragmentManager; import org.codehaus.jackson.map.ObjectMapper; @@ -125,8 +127,6 @@ public class Foreman implements Runnable { private volatile DistributedLease lease; // used to limit the number of concurrent queries - private FragmentExecutor rootRunner; // root Fragment - private final ExtendedLatch acceptExternalEvents = new ExtendedLatch(); // gates acceptance of external events private final StateListener stateListener = new StateListener(); // source of external events private final ResponseSendListener responseListener = new ResponseSendListener(); @@ -935,8 +935,9 @@ public class Foreman implements Runnable { queryManager.addFragmentStatusTracker(rootFragment, true); - rootRunner = new FragmentExecutor(rootContext, rootFragment, - queryManager.newRootStatusHandler(rootContext, drillbitContext), + final ControlTunnel tunnel = drillbitContext.getController().getTunnel(queryContext.getCurrentEndpoint()); + final FragmentExecutor rootRunner = new FragmentExecutor(rootContext, rootFragment, + new FragmentStatusReporter(rootContext, tunnel), rootOperator); final RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner); @@ -945,7 +946,6 @@ public class Foreman implements Runnable { bee.addFragmentRunner(fragmentManager.getRunnable()); } else { // if we do, record the fragment manager in the workBus. - // TODO aren't we managing our own work? What does this do? It looks like this will never get run drillbitContext.getWorkBus().addFragmentManager(fragmentManager); } } http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java index 9318233..554a279 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java @@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.exceptions.UserRemoteException; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; @@ -44,7 +43,6 @@ import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; import org.apache.drill.exec.proto.UserProtos.RunQuery; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.RpcException; -import org.apache.drill.exec.rpc.control.ControlTunnel; import org.apache.drill.exec.rpc.control.Controller; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.sys.PStore; @@ -52,8 +50,6 @@ import org.apache.drill.exec.store.sys.PStoreConfig; import org.apache.drill.exec.store.sys.PStoreProvider; import org.apache.drill.exec.work.EndpointListener; import org.apache.drill.exec.work.foreman.Foreman.StateListener; -import org.apache.drill.exec.work.fragment.NonRootStatusReporter; -import org.apache.drill.exec.work.fragment.StatusReporter; import com.carrotsearch.hppc.IntObjectOpenHashMap; import com.google.common.base.Preconditions; @@ -451,11 +447,6 @@ public class QueryManager { } } - public StatusReporter newRootStatusHandler(final FragmentContext context, final DrillbitContext dContext) { - final ControlTunnel tunnel = dContext.getController().getTunnel(foreman.getQueryContext().getCurrentEndpoint()); - return new NonRootStatusReporter(context, tunnel); - } - public FragmentStatusListener getFragmentStatusListener(){ return fragmentStatusListener; } http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java deleted file mode 100644 index 8a40f1b..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.work.fragment; - -import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.proto.BitControl.FragmentStatus; -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; -import org.apache.drill.exec.proto.UserBitShared.FragmentState; -import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile; -import org.apache.drill.exec.proto.helper.QueryIdHelper; - -public abstract class AbstractStatusReporter implements StatusReporter{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStatusReporter.class); - - private final FragmentContext context; - - public AbstractStatusReporter(final FragmentContext context) { - super(); - this.context = context; - } - - private FragmentStatus.Builder getBuilder(final FragmentState state){ - return getBuilder(context, state, null); - } - - public static FragmentStatus.Builder getBuilder(final FragmentContext context, final FragmentState state, final UserException ex){ - final FragmentStatus.Builder status = FragmentStatus.newBuilder(); - final MinorFragmentProfile.Builder b = MinorFragmentProfile.newBuilder(); - context.getStats().addMetricsToStatus(b); - b.setState(state); - if(ex != null){ - final boolean verbose = context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val; - b.setError(ex.getOrCreatePBError(verbose)); - } - status.setHandle(context.getHandle()); - b.setMemoryUsed(context.getAllocator().getAllocatedMemory()); - b.setMinorFragmentId(context.getHandle().getMinorFragmentId()); - status.setProfile(b); - return status; - } - - @Override - public void stateChanged(final FragmentHandle handle, final FragmentState newState) { - final FragmentStatus.Builder status = getBuilder(newState); - logger.info("State changed for {}. New state: {}", QueryIdHelper.getQueryIdentifier(handle), newState); - switch(newState){ - case AWAITING_ALLOCATION: - case CANCELLATION_REQUESTED: - case CANCELLED: - case FINISHED: - case RUNNING: - statusChange(handle, status.build()); - break; - case SENDING: - // no op. - break; - case FAILED: - // shouldn't get here since fail() should be called. - default: - throw new IllegalStateException(String.format("Received state changed event for unexpected state of %s.", newState)); - } - } - - protected abstract void statusChange(FragmentHandle handle, FragmentStatus status); - - @Override - public final void fail(final FragmentHandle handle, final UserException excep) { - final FragmentStatus.Builder status = getBuilder(context, FragmentState.FAILED, excep); - fail(handle, status); - } - - private void fail(final FragmentHandle handle, final FragmentStatus.Builder statusBuilder) { - statusChange(handle, statusBuilder.build()); - } - - -} http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index 9eec782..6d03f01 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -58,7 +58,7 @@ public class FragmentExecutor implements Runnable { private final AtomicBoolean hasCloseoutThread = new AtomicBoolean(false); private final String fragmentName; private final FragmentContext fragmentContext; - private final StatusReporter listener; + private final FragmentStatusReporter statusReporter; private final DeferredException deferredException = new DeferredException(); private final PlanFragment fragment; private final FragmentRoot rootOperator; @@ -75,12 +75,11 @@ public class FragmentExecutor implements Runnable { * * @param context * @param fragment - * @param listener - * @param rootOperator + * @param statusReporter */ public FragmentExecutor(final FragmentContext context, final PlanFragment fragment, - final StatusReporter listener) { - this(context, fragment, listener, null); + final FragmentStatusReporter statusReporter) { + this(context, fragment, statusReporter, null); } /** @@ -88,13 +87,13 @@ public class FragmentExecutor implements Runnable { * * @param context * @param fragment - * @param listener + * @param statusReporter * @param rootOperator */ public FragmentExecutor(final FragmentContext context, final PlanFragment fragment, - final StatusReporter listener, final FragmentRoot rootOperator) { + final FragmentStatusReporter statusReporter, final FragmentRoot rootOperator) { this.fragmentContext = context; - this.listener = listener; + this.statusReporter = statusReporter; this.fragment = fragment; this.rootOperator = rootOperator; this.fragmentName = QueryIdHelper.getQueryIdentifier(context.getHandle()); @@ -131,9 +130,7 @@ public class FragmentExecutor implements Runnable { return null; } - return AbstractStatusReporter - .getBuilder(fragmentContext, FragmentState.RUNNING, null) - .build(); + return statusReporter.getStatus(FragmentState.RUNNING); } /** @@ -327,9 +324,9 @@ public class FragmentExecutor implements Runnable { .addIdentity(getContext().getIdentity()) .addContext("Fragment", handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId()) .build(logger); - listener.fail(fragmentContext.getHandle(), uex); + statusReporter.fail(uex); } else { - listener.stateChanged(fragmentContext.getHandle(), outcome); + statusReporter.stateChanged(outcome); } } @@ -362,7 +359,6 @@ public class FragmentExecutor implements Runnable { } private synchronized boolean updateState(FragmentState target) { - final FragmentHandle handle = fragmentContext.getHandle(); final FragmentState current = fragmentState.get(); logger.info(fragmentName + ": State change requested {} --> {}", current, target); switch (target) { @@ -372,7 +368,7 @@ public class FragmentExecutor implements Runnable { case AWAITING_ALLOCATION: case RUNNING: fragmentState.set(target); - listener.stateChanged(handle, target); + statusReporter.stateChanged(target); return true; default: @@ -390,7 +386,7 @@ public class FragmentExecutor implements Runnable { case FAILED: if(!isTerminal(current)){ fragmentState.set(target); - // don't notify listener until we finalize this terminal state. + // don't notify reporter until we finalize this terminal state. return true; } else if (current == FragmentState.FAILED) { // no warn since we can call fail multiple times. @@ -406,7 +402,7 @@ public class FragmentExecutor implements Runnable { case RUNNING: if(current == FragmentState.AWAITING_ALLOCATION){ fragmentState.set(target); - listener.stateChanged(handle, target); + statusReporter.stateChanged(target); return true; }else{ errorStateChange(current, target); http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java new file mode 100644 index 0000000..3dd9dc5 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.fragment; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.proto.BitControl.FragmentStatus; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.proto.UserBitShared.FragmentState; +import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.rpc.control.ControlTunnel; + +/** + * The status reporter is responsible for receiving changes in fragment state and propagating the status back to the + * Foreman through a control tunnel. + */ +public class FragmentStatusReporter { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStatusReporter.class); + + private final FragmentContext context; + private final ControlTunnel tunnel; + + public FragmentStatusReporter(final FragmentContext context, final ControlTunnel tunnel) { + this.context = context; + this.tunnel = tunnel; + } + + /** + * Returns a {@link FragmentStatus} with the given state. {@link FragmentStatus} has additional information like + * metrics, etc. that is gathered from the {@link FragmentContext}. + * + * @param state the state to include in the status + * @return the status + */ + FragmentStatus getStatus(final FragmentState state) { + return getStatus(state, null); + } + + private FragmentStatus getStatus(final FragmentState state, final UserException ex) { + final FragmentStatus.Builder status = FragmentStatus.newBuilder(); + final MinorFragmentProfile.Builder b = MinorFragmentProfile.newBuilder(); + context.getStats().addMetricsToStatus(b); + b.setState(state); + if (ex != null) { + final boolean verbose = context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val; + b.setError(ex.getOrCreatePBError(verbose)); + } + status.setHandle(context.getHandle()); + b.setMemoryUsed(context.getAllocator().getAllocatedMemory()); + b.setMinorFragmentId(context.getHandle().getMinorFragmentId()); + status.setProfile(b); + return status.build(); + } + + /** + * Reports the state change to the Foreman. The state is wrapped in a {@link FragmentStatus} that has additional + * information like metrics, etc. This additional information is gathered from the {@link FragmentContext}. + * NOTE: Use {@link #fail} to report state change to {@link FragmentState#FAILED}. + * + * @param newState the new state + */ + void stateChanged(final FragmentState newState) { + final FragmentStatus status = getStatus(newState, null); + logger.info("{}: State to report: {}", QueryIdHelper.getQueryIdentifier(context.getHandle()), newState); + switch (newState) { + case AWAITING_ALLOCATION: + case CANCELLATION_REQUESTED: + case CANCELLED: + case FINISHED: + case RUNNING: + sendStatus(status); + break; + case SENDING: + // no op. + break; + case FAILED: + // shouldn't get here since fail() should be called. + default: + throw new IllegalStateException(String.format("Received state changed event for unexpected state of %s.", newState)); + } + } + + private void sendStatus(final FragmentStatus status) { + tunnel.sendFragmentStatus(status); + } + + /** + * {@link FragmentStatus} with the {@link FragmentState#FAILED} state is reported to the Foreman. The + * {@link FragmentStatus} has additional information like metrics, etc. that is gathered from the + * {@link FragmentContext}. + * + * @param ex the exception related to the failure + */ + void fail(final UserException ex) { + final FragmentStatus status = getStatus(FragmentState.FAILED, ex); + sendStatus(status); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java index 77440c5..3fc757c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java @@ -55,8 +55,8 @@ public class NonRootFragmentManager implements FragmentManager { this.handle = fragment.getHandle(); this.context = new FragmentContext(context, fragment, context.getFunctionImplementationRegistry()); this.buffers = new IncomingBuffers(fragment, this.context); - final StatusReporter reporter = new NonRootStatusReporter(this.context, context.getController().getTunnel( - fragment.getForeman())); + final FragmentStatusReporter reporter = new FragmentStatusReporter(this.context, + context.getController().getTunnel(fragment.getForeman())); this.runner = new FragmentExecutor(this.context, fragment, reporter); this.context.setBuffers(buffers); http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootStatusReporter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootStatusReporter.java deleted file mode 100644 index 71da12b..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootStatusReporter.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.work.fragment; - -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.proto.BitControl.FragmentStatus; -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; -import org.apache.drill.exec.rpc.control.ControlTunnel; - -/** - * For all non root fragments, status will be reported back to the foreman through a control tunnel. - */ -public class NonRootStatusReporter extends AbstractStatusReporter{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NonRootStatusReporter.class); - - private final ControlTunnel tunnel; - - public NonRootStatusReporter(FragmentContext context, ControlTunnel tunnel) { - super(context); - this.tunnel = tunnel; - } - - @Override - protected void statusChange(FragmentHandle handle, FragmentStatus status) { - logger.debug("Sending status change message message to remote node: " + status); - tunnel.sendFragmentStatus(status); - } - -} http://git-wip-us.apache.org/repos/asf/drill/blob/2506ecf1/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java deleted file mode 100644 index 424e7fb..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/StatusReporter.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.work.fragment; - -import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; -import org.apache.drill.exec.proto.UserBitShared.FragmentState; - -/** - * The status handler is responsible for receiving changes in fragment status and propagating them back to the foreman. - */ -public interface StatusReporter { - void fail(FragmentHandle handle, UserException excep); - void stateChanged(FragmentHandle handle, FragmentState newState); -}
