Repository: apex-core Updated Branches: refs/heads/master a54e0b7f8 -> 0be03527e
APEXCORE-405 Allow client to query if application was finished. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/0be03527 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/0be03527 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/0be03527 Branch: refs/heads/master Commit: 0be03527e361b023e79c0d9f9e8ef5e2632d64bf Parents: a54e0b7 Author: Thomas Weise <[email protected]> Authored: Thu Nov 24 18:22:26 2016 -0800 Committer: Thomas Weise <[email protected]> Committed: Thu Nov 24 18:22:26 2016 -0800 ---------------------------------------------------------------------- .../apache/apex/api/EmbeddedAppLauncher.java | 11 ++-- .../main/java/org/apache/apex/api/Launcher.java | 54 ++++++++--------- .../datatorrent/stram/StramLocalCluster.java | 5 ++ .../apex/engine/EmbeddedAppLauncherImpl.java | 33 +++++------ .../apache/apex/engine/YarnAppLauncherImpl.java | 61 +++++++++++++------- 5 files changed, 91 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/0be03527/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java b/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java index 8e3e0f6..4ff705b 100644 --- a/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java +++ b/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java @@ -21,7 +21,6 @@ package org.apache.apex.api; import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.Attribute; -import com.datatorrent.api.LocalMode; import com.datatorrent.api.StreamingApplication; /** @@ -33,25 +32,25 @@ public abstract class EmbeddedAppLauncher<H extends EmbeddedAppLauncher.Embedded /** * Parameter to specify the time after which the application will be shutdown; pass 0 to run indefinitely. */ - public static final Attribute<Long> RUN_MILLIS = new Attribute<Long>(0L); + public static final Attribute<Long> RUN_MILLIS = new Attribute<>(0L); /** * Parameter to launch application asynchronously and return from launch immediately. */ - public static final Attribute<Boolean> RUN_ASYNC = new Attribute<Boolean>(false); + public static final Attribute<Boolean> RUN_ASYNC = new Attribute<>(false); /** * Parameter to enable or disable heartbeat monitoring. */ - public static final Attribute<Boolean> HEARTBEAT_MONITORING = new Attribute<Boolean>(true); + public static final Attribute<Boolean> HEARTBEAT_MONITORING = new Attribute<>(true); /** * Parameter to serialize DAG before launch. */ - public static final Attribute<Boolean> SERIALIZE_DAG = new Attribute<Boolean>(false); + public static final Attribute<Boolean> SERIALIZE_DAG = new Attribute<>(false); static { - Attribute.AttributeMap.AttributeInitializer.initialize(LocalMode.class); + Attribute.AttributeMap.AttributeInitializer.initialize(EmbeddedAppLauncher.class); } public static EmbeddedAppLauncher newInstance() http://git-wip-us.apache.org/repos/asf/apex-core/blob/0be03527/api/src/main/java/org/apache/apex/api/Launcher.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/apex/api/Launcher.java b/api/src/main/java/org/apache/apex/api/Launcher.java index 14c365a..0291ab0 100644 --- a/api/src/main/java/org/apache/apex/api/Launcher.java +++ b/api/src/main/java/org/apache/apex/api/Launcher.java @@ -77,8 +77,30 @@ public abstract class Launcher<H extends Launcher.AppHandle> KILL } - // Marker interface - public interface AppHandle {} + /** + * Results of application launch. The client can interact with the running application through this handle. + */ + public interface AppHandle + { + boolean isFinished(); + + /** + * Shutdown the application. + * + * The method takes the application handle and a shutdown mode. The shutdown mode specifies how to shutdown the + * application. + * + * If the mode is AWAIT_TERMINATION, an attempt should be made to shutdown the application in an orderly fashion + * and wait till termination. If the application does not terminate in a reasonable amount of time the + * implementation can forcibly terminate the application. + * + * If the mode is KILL, the application can be killed immediately. + * + * @param shutdownMode The shutdown mode + */ + void shutdown(ShutdownMode shutdownMode) throws LauncherException; + + } /** * Get a launcher instance.<br><br> @@ -138,34 +160,6 @@ public abstract class Launcher<H extends Launcher.AppHandle> */ public abstract H launchApp(StreamingApplication application, Configuration configuration, Attribute.AttributeMap launchParameters) throws LauncherException; - /** - * Shutdown the application and await termination. - * Also see {@link #shutdownApp(AppHandle, ShutdownMode)} - * - * @param app The application handle - */ - public void shutdownApp(H app) throws LauncherException - { - shutdownApp(app, ShutdownMode.AWAIT_TERMINATION); - } - - /** - * Shutdown the application. - * - * The method takes the application handle and a shutdown mode. The shutdown mode specifies how to shutdown the - * application. - * - * If the mode is AWAIT_TERMINATION, an attempt should be made to shutdown the application in an orderly fashion - * and wait till termination. If the application does not terminate in a reasonable amount of time the - * implementation can forcibly terminate the application. - * - * If the mode is KILL, the application can be killed immediately. - * - * @param app The application handle - * @param shutdownMode The shutdown mode - */ - public abstract void shutdownApp(H app, ShutdownMode shutdownMode) throws LauncherException; - protected static <T> T loadService(Class<T> clazz) { ServiceLoader<T> loader = ServiceLoader.load(clazz); http://git-wip-us.apache.org/repos/asf/apex-core/blob/0be03527/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java index 48ed070..14a2827 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java +++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java @@ -411,6 +411,11 @@ public class StramLocalCluster implements Runnable, Controller appDone = true; } + public boolean isFinished() + { + return appDone; + } + @Override public void setHeartbeatMonitoringEnabled(boolean enabled) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/0be03527/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java b/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java index 9ace9b5..5930e78 100644 --- a/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java +++ b/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java @@ -68,7 +68,7 @@ public class EmbeddedAppLauncherImpl extends LocalMode<EmbeddedAppLauncherImpl.E } catch (Exception e) { throw new LauncherException(e); } - LocalMode.Controller lc = getController(); + StramLocalCluster lc = getController(); boolean launched = false; if (launchParameters != null) { if (StramUtils.getValueWithDefault(launchParameters, SERIALIZE_DAG)) { @@ -100,16 +100,6 @@ public class EmbeddedAppLauncherImpl extends LocalMode<EmbeddedAppLauncherImpl.E } @Override - public void shutdownApp(EmbeddedAppHandleImpl app, ShutdownMode shutdownMode) throws LauncherException - { - if (shutdownMode != ShutdownMode.KILL) { - app.controller.shutdown(); - } else { - throw new UnsupportedOperationException("Kill not supported"); - } - } - - @Override public DAG prepareDAG(StreamingApplication app, Configuration conf) throws Exception { if (app == null && conf == null) { @@ -125,7 +115,7 @@ public class EmbeddedAppLauncherImpl extends LocalMode<EmbeddedAppLauncherImpl.E } @Override - public Controller getController() + public StramLocalCluster getController() { try { addLibraryJarsToClasspath(lp); @@ -157,17 +147,26 @@ public class EmbeddedAppLauncherImpl extends LocalMode<EmbeddedAppLauncherImpl.E } - /** - * - */ public static class EmbeddedAppHandleImpl implements EmbeddedAppLauncher.EmbeddedAppHandle { - Controller controller; + final StramLocalCluster controller; - public EmbeddedAppHandleImpl(Controller controller) + public EmbeddedAppHandleImpl(StramLocalCluster controller) { this.controller = controller; } + @Override + public boolean isFinished() + { + return controller.isFinished(); + } + + @Override + public void shutdown(ShutdownMode shutdownMode) throws LauncherException + { + controller.shutdown(); + } + } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/0be03527/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java index 4f5c8c8..d7a6dc8 100644 --- a/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java +++ b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java @@ -20,14 +20,15 @@ package org.apache.apex.engine; import java.io.IOException; import java.util.HashMap; -import java.util.List; import java.util.Map; import org.apache.apex.api.YarnAppLauncher; import org.apache.apex.engine.util.StreamingAppFactory; +import org.apache.bval.jsr303.util.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -35,7 +36,6 @@ import com.google.common.base.Throwables; import com.datatorrent.api.Attribute; import com.datatorrent.api.StreamingApplication; -import com.datatorrent.stram.StramUtils; import com.datatorrent.stram.client.StramAppLauncher; import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration; @@ -54,6 +54,7 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar propMapping.put(YarnAppLauncher.QUEUE_NAME, StramAppLauncher.QUEUE_NAME); } + @Override public YarnAppHandleImpl launchApp(final StreamingApplication app, Configuration conf, Attribute.AttributeMap launchParameters) throws LauncherException { if (launchParameters != null) { @@ -83,27 +84,21 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar } } - @Override - public void shutdownApp(YarnAppHandleImpl app, ShutdownMode shutdownMode) throws LauncherException + protected void shutdownApp(YarnAppHandleImpl app, ShutdownMode shutdownMode) throws LauncherException { if (shutdownMode == ShutdownMode.KILL) { YarnClient yarnClient = YarnClient.createYarnClient(); try { - String appId = app.getApplicationId(); - ApplicationId applicationId = null; - List<ApplicationReport> applications = StramUtils.getApexApplicationList(yarnClient); - for (ApplicationReport application : applications) { - if (application.getApplicationId().toString().equals(appId)) { - applicationId = application.getApplicationId(); - break; - } - } - if (applicationId == null) { - throw new LauncherException("Application " + appId + " not found"); + ApplicationId applicationId = app.appId; + ApplicationReport appReport = yarnClient.getApplicationReport(applicationId); + if (appReport == null) { + throw new LauncherException("Application " + app.getApplicationId() + " not found"); } yarnClient.killApplication(applicationId); } catch (YarnException | IOException e) { throw Throwables.propagate(e); + } finally { + IOUtils.closeQuietly(yarnClient); } } else { throw new UnsupportedOperationException("Orderly shutdown not supported, try kill instead"); @@ -127,12 +122,9 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar } } - /** - * - */ - public static class YarnAppHandleImpl implements YarnAppLauncher.YarnAppHandle + public class YarnAppHandleImpl implements YarnAppLauncher.YarnAppHandle { - ApplicationId appId; + final ApplicationId appId; public YarnAppHandleImpl(ApplicationId appId) { @@ -144,5 +136,34 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar { return appId.toString(); } + + @Override + public boolean isFinished() + { + YarnClient yarnClient = YarnClient.createYarnClient(); + try { + ApplicationReport appReport = yarnClient.getApplicationReport(appId); + if (appReport != null) { + if (appReport.getFinalApplicationStatus() == null + || appReport.getFinalApplicationStatus() == FinalApplicationStatus.UNDEFINED) { + return false; + } + } + return true; + } catch (YarnException | IOException e) { + throw Throwables.propagate(e); + } finally { + IOUtils.closeQuietly(yarnClient); + } + } + + @Override + public void shutdown(org.apache.apex.api.Launcher.ShutdownMode shutdownMode) + throws org.apache.apex.api.Launcher.LauncherException + { + shutdownApp(this, shutdownMode); + + } + } }
