(TWILL-180) Reflects YARN application completion status via TwillController
This closes #54 on Github. Signed-off-by: Terence Yim <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/twill/repo Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/cc79f0d0 Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/cc79f0d0 Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/cc79f0d0 Branch: refs/heads/site Commit: cc79f0d0b23c7394dba146868f769291f782122f Parents: c310b69 Author: Terence Yim <[email protected]> Authored: Mon Apr 3 23:38:58 2017 -0700 Committer: Terence Yim <[email protected]> Committed: Tue Apr 4 02:09:59 2017 -0700 ---------------------------------------------------------------------- .../org/apache/twill/api/ServiceController.java | 32 +++++++++++++++++++- .../AbstractExecutionServiceController.java | 12 ++++++++ .../twill/internal/TwillContainerLauncher.java | 7 +++++ .../apache/twill/yarn/YarnTwillController.java | 15 +++++++-- .../apache/twill/yarn/TaskCompletedTestRun.java | 28 ++++++++++++++++- 5 files changed, 90 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/twill/blob/cc79f0d0/twill-api/src/main/java/org/apache/twill/api/ServiceController.java ---------------------------------------------------------------------- diff --git a/twill-api/src/main/java/org/apache/twill/api/ServiceController.java b/twill-api/src/main/java/org/apache/twill/api/ServiceController.java index 1ea86b2..bb46290 100644 --- a/twill-api/src/main/java/org/apache/twill/api/ServiceController.java +++ b/twill-api/src/main/java/org/apache/twill/api/ServiceController.java @@ -22,6 +22,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; /** * This interface is for controlling a remote running service. @@ -57,7 +58,8 @@ public interface ServiceController { * will be returned. * * @return a {@link Future} that represents the termination of the service. The future result will be - * this {@link ServiceController}. If the service terminated due to exception, the future will carry the exception. + * this {@link ServiceController}. If the service terminated with a {@link TerminationStatus#FAILED} status, + * calling the {@link Future#get()} on the returning future will throw {@link ExecutionException}. */ Future<? extends ServiceController> terminate(); @@ -98,4 +100,32 @@ public interface ServiceController { * @throws ExecutionException if the service terminated due to exception. */ void awaitTerminated(long timeout, TimeUnit timeoutUnit) throws TimeoutException, ExecutionException; + + /** + * Gets the termination status of the application represented by this controller. + * + * @return the termination status or {@code null} if the application is still running + */ + @Nullable + TerminationStatus getTerminationStatus(); + + /** + * Enum to represent termination status of the application when it completed. + */ + enum TerminationStatus { + /** + * Application was completed successfully. + */ + SUCCEEDED, + + /** + * Application was killed explicitly. + */ + KILLED, + + /** + * Application failed. + */ + FAILED + } } http://git-wip-us.apache.org/repos/asf/twill/blob/cc79f0d0/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java index 580a88f..3ea27fc 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java +++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java @@ -36,6 +36,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; /** * An abstract base class for implementing {@link ServiceController} that deal with Service state transition and @@ -47,6 +48,7 @@ public abstract class AbstractExecutionServiceController implements ServiceContr private final ListenerExecutors listenerExecutors; private final Service serviceDelegate; private final SettableFuture<State> terminationFuture; + private volatile TerminationStatus terminationStatus; protected AbstractExecutionServiceController(RunId runId) { this.runId = runId; @@ -87,6 +89,12 @@ public abstract class AbstractExecutionServiceController implements ServiceContr }); } + @Nullable + @Override + public TerminationStatus getTerminationStatus() { + return terminationStatus; + } + @Override public void onRunning(final Runnable runnable, Executor executor) { addListener(new ServiceListenerAdapter() { @@ -168,6 +176,10 @@ public abstract class AbstractExecutionServiceController implements ServiceContr }; } + protected final void setTerminationStatus(TerminationStatus status) { + this.terminationStatus = status; + } + private final class ServiceDelegate extends AbstractIdleService { @Override http://git-wip-us.apache.org/repos/asf/twill/blob/cc79f0d0/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java index 9b6384c..0f8674b 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java +++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java @@ -246,6 +246,13 @@ public final class TwillContainerLauncher { @Override public void completed(int exitStatus) { // count down the shutdownLatch to inform any waiting threads that this container is complete + if (exitStatus == 0) { + setTerminationStatus(TerminationStatus.SUCCEEDED); + } else if (exitStatus == 143) { + setTerminationStatus(TerminationStatus.KILLED); + } else { + setTerminationStatus(TerminationStatus.FAILED); + } shutdownLatch.countDown(); synchronized (this) { forceShutDown(); http://git-wip-us.apache.org/repos/asf/twill/blob/cc79f0d0/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java index 6ea7d8f..335d7ec 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java @@ -166,13 +166,14 @@ final class YarnTwillController extends AbstractTwillController implements Twill kill(); } + FinalApplicationStatus finalStatus; // Poll application status from yarn try (ProcessController<YarnApplicationReport> processController = this.processController) { Stopwatch stopWatch = new Stopwatch().start(); long maxTime = TimeUnit.MILLISECONDS.convert(Constants.APPLICATION_MAX_STOP_SECONDS, TimeUnit.SECONDS); YarnApplicationReport report = processController.getReport(); - FinalApplicationStatus finalStatus = report.getFinalApplicationStatus(); + finalStatus = report.getFinalApplicationStatus(); ApplicationId appId = report.getApplicationId(); while (finalStatus == FinalApplicationStatus.UNDEFINED && stopWatch.elapsedTime(TimeUnit.MILLISECONDS) < maxTime) { @@ -180,18 +181,28 @@ final class YarnTwillController extends AbstractTwillController implements Twill TimeUnit.SECONDS.sleep(1); finalStatus = processController.getReport().getFinalApplicationStatus(); } - LOG.debug("Yarn application {} {} completed with status {}", appName, appId, finalStatus); // Application not finished after max stop time, kill the application if (finalStatus == FinalApplicationStatus.UNDEFINED) { kill(); + finalStatus = FinalApplicationStatus.KILLED; } } catch (Exception e) { LOG.warn("Exception while waiting for application report: {}", e.getMessage(), e); kill(); + finalStatus = FinalApplicationStatus.KILLED; } super.doShutDown(); + + if (finalStatus == FinalApplicationStatus.FAILED) { + // If we know the app status is failed, throw an exception to make this controller goes into error state. + // All other final status are not treated as failure as we can't be sure. + setTerminationStatus(TerminationStatus.FAILED); + throw new RuntimeException(String.format("Yarn application completed with failure %s, %s.", appName, getRunId())); + } + setTerminationStatus(finalStatus == FinalApplicationStatus.SUCCEEDED + ? TerminationStatus.SUCCEEDED : TerminationStatus.KILLED); } @Override http://git-wip-us.apache.org/repos/asf/twill/blob/cc79f0d0/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java index 51031d4..6fbdc2d 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java @@ -20,6 +20,7 @@ package org.apache.twill.yarn; import com.google.common.base.Throwables; import org.apache.twill.api.AbstractTwillRunnable; import org.apache.twill.api.ResourceSpecification; +import org.apache.twill.api.ServiceController; import org.apache.twill.api.TwillController; import org.apache.twill.api.TwillRunner; import org.apache.twill.api.logging.PrinterLogHandler; @@ -82,7 +83,32 @@ public final class TaskCompletedTestRun extends BaseYarnTest { Assert.assertTrue(runLatch.await(1, TimeUnit.MINUTES)); controller.awaitTerminated(1, TimeUnit.MINUTES); + Assert.assertEquals(ServiceController.TerminationStatus.SUCCEEDED, controller.getTerminationStatus()); + } + + @Test + public void testFailureComplete() throws TimeoutException, ExecutionException, InterruptedException { + TwillRunner twillRunner = getTwillRunner(); + + // Start the app with an invalid ClassLoader. This will cause the AM fails to start. + TwillController controller = twillRunner.prepare(new SleepTask(), + ResourceSpecification.Builder.with() + .setVirtualCores(1) + .setMemory(512, ResourceSpecification.SizeUnit.MEGA) + .setInstances(1).build()) + .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true))) + .setClassLoader("InvalidClassLoader") + .start(); + + final CountDownLatch terminateLatch = new CountDownLatch(1); + controller.onTerminated(new Runnable() { + @Override + public void run() { + terminateLatch.countDown(); + } + }, Threads.SAME_THREAD_EXECUTOR); - TimeUnit.SECONDS.sleep(2); + Assert.assertTrue(terminateLatch.await(2, TimeUnit.MINUTES)); + Assert.assertEquals(ServiceController.TerminationStatus.FAILED, controller.getTerminationStatus()); } }
