[FLINK-8224] [flip6] Shutdown application when job terminated in job mode This closes #5139.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4ecc7ff Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4ecc7ff Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4ecc7ff Branch: refs/heads/master Commit: a4ecc7ffe4ba16a68de06c1053c7916e6082b413 Parents: c1734f4 Author: shuai.xus <[email protected]> Authored: Fri Dec 8 18:02:42 2017 +0800 Committer: Till Rohrmann <[email protected]> Committed: Thu Jan 25 15:33:29 2018 +0100 ---------------------------------------------------------------------- .../clusterframework/MesosResourceManager.java | 4 +++- .../entrypoint/JobClusterEntrypoint.java | 23 +++++++++++++++----- .../resourcemanager/ResourceManager.java | 14 ++++++++---- .../StandaloneResourceManager.java | 6 +++-- .../resourcemanager/TestingResourceManager.java | 4 +++- .../apache/flink/yarn/YarnResourceManager.java | 6 +++-- 6 files changed, 42 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java index cabb7d7..8b67257 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java @@ -75,6 +75,8 @@ import org.apache.mesos.SchedulerDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -371,7 +373,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN @Override protected void shutDownApplication( ApplicationStatus finalStatus, - String optionalDiagnostics) throws ResourceManagerException { + @Nullable String optionalDiagnostics) throws ResourceManagerException { LOG.info("Shutting down and unregistering as a Mesos framework."); Exception exception = null; http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java index cb1b086..ede8d13 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -53,6 +54,7 @@ import org.apache.flink.util.ConfigurationException; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedThrowable; import akka.actor.ActorSystem; @@ -258,8 +260,15 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { } } - private void shutDownAndTerminate(boolean cleanupHaData) { + private void shutDownAndTerminate( + boolean cleanupHaData, + ApplicationStatus status, + @Nullable String optionalDiagnostics) { try { + if (resourceManager != null) { + resourceManager.shutDownCluster(status, optionalDiagnostics); + } + shutDown(cleanupHaData); } catch (Throwable t) { LOG.error("Could not properly shut down cluster entrypoint.", t); @@ -292,23 +301,27 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { public void jobFinished(JobResult result) { LOG.info("Job({}) finished.", jobId); - shutDownAndTerminate(true); + shutDownAndTerminate(true, ApplicationStatus.SUCCEEDED, null); } @Override public void jobFailed(JobResult result) { checkArgument(result.getSerializedThrowable().isPresent()); - LOG.info("Job({}) failed.", jobId, result.getSerializedThrowable().get().getMessage()); + final SerializedThrowable serializedThrowable = result.getSerializedThrowable().get(); + + final String errorMessage = serializedThrowable.getMessage(); + + LOG.info("Job({}) failed: {}.", jobId, errorMessage); - shutDownAndTerminate(false); + shutDownAndTerminate(true, ApplicationStatus.FAILED, errorMessage); } @Override public void jobFinishedByOther() { LOG.info("Job({}) was finished by another JobManager.", jobId); - shutDownAndTerminate(false); + shutDownAndTerminate(false, ApplicationStatus.UNKNOWN, "Job was finished by another master"); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index a0ff5f4..e5fef14 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -63,6 +63,8 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -479,10 +481,12 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> * Cleanup application and shut down cluster. * * @param finalStatus of the Flink application - * @param optionalDiagnostics for the Flink application + * @param optionalDiagnostics diagnostics message for the Flink application or {@code null} */ @Override - public void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics) { + public void shutDownCluster( + final ApplicationStatus finalStatus, + @Nullable final String optionalDiagnostics) { log.info("Shut down cluster because application is in {}, diagnostics {}.", finalStatus, optionalDiagnostics); try { @@ -930,10 +934,12 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> * yet are returned. * * @param finalStatus The application status to report. - * @param optionalDiagnostics An optional diagnostics message. + * @param optionalDiagnostics A diagnostics message or {@code null}. * @throws ResourceManagerException if the application could not be shut down. */ - protected abstract void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) throws ResourceManagerException; + protected abstract void shutDownApplication( + ApplicationStatus finalStatus, + @Nullable String optionalDiagnostics) throws ResourceManagerException; /** * Allocates a resource using the resource profile. http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java index 624f31d..886a046 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java @@ -29,11 +29,13 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import javax.annotation.Nullable; + /** * A standalone implementation of the resource manager. Used when the system is started in * standalone mode (via scripts), rather than via a resource framework like YARN or Mesos. * - * This ResourceManager doesn't acquire new resources. + * <p>This ResourceManager doesn't acquire new resources. */ public class StandaloneResourceManager extends ResourceManager<ResourceID> { @@ -67,7 +69,7 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> { } @Override - protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) { + protected void shutDownApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) { } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java index 0d30822..2af024e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java @@ -29,6 +29,8 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import javax.annotation.Nullable; + /** * Simple {@link ResourceManager} implementation for testing purposes. */ @@ -54,7 +56,7 @@ public class TestingResourceManager extends ResourceManager<ResourceID> { } @Override - protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) throws ResourceManagerException { + protected void shutDownApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) throws ResourceManagerException { // noop } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 0fa0dda..910172d 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -252,11 +252,13 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme } @Override - protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) { + protected void shutDownApplication( + ApplicationStatus finalStatus, + @Nullable String optionalDiagnostics) { // first, de-register from YARN FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus); - log.info("Unregister application from the YARN Resource Manager"); + log.info("Unregister application from the YARN Resource Manager with final status {}.", yarnStatus); try { resourceManagerClient.unregisterApplicationMaster(yarnStatus, optionalDiagnostics, "");
