This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 20447b23848c2f69afef2742e06922fe174ddc47 Author: Till Rohrmann <[email protected]> AuthorDate: Thu Sep 27 17:14:00 2018 +0200 [hotfix] Let ClusterEntrypoint implement AutoCloseableAsync --- .../apache/flink/runtime/entrypoint/ClusterEntrypoint.java | 13 +++++++++++-- .../AbstractTaskManagerProcessFailureRecoveryTest.java | 7 +------ 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 5665500..9eaef34 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -54,6 +54,7 @@ import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityContext; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.util.AutoCloseableAsync; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FileUtils; import org.apache.flink.util.Preconditions; @@ -88,7 +89,7 @@ import scala.concurrent.duration.FiniteDuration; * * <p>Specialization of this class can be used for the session mode and the per-job mode */ -public abstract class ClusterEntrypoint implements FatalErrorHandler { +public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErrorHandler { public static final ConfigOption<String> EXECUTION_MODE = ConfigOptions .key("internal.cluster.execution-mode") @@ -312,6 +313,14 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { return new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration)); } + @Override + public CompletableFuture<Void> closeAsync() { + return shutDownAsync( + ApplicationStatus.UNKNOWN, + "Cluster entrypoint has been closed externally.", + true).thenAccept(ignored -> {}); + } + protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) { synchronized (lock) { Throwable exception = null; @@ -392,7 +401,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { return resultConfiguration; } - public CompletableFuture<ApplicationStatus> shutDownAsync( + private CompletableFuture<ApplicationStatus> shutDownAsync( ApplicationStatus applicationStatus, @Nullable String diagnostics, boolean cleanupHaData) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index 1374b70..5d7f26b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -24,7 +24,6 @@ import org.apache.flink.configuration.HeartbeatManagerOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint; import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; @@ -101,9 +100,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000L); jmConfig.setInteger(RestOptions.PORT, restPort); - final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(jmConfig); - - try { + try (final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(jmConfig)) { // check that we run this test only if the java command // is available on this machine String javaCommand = getJavaCommandPath(); @@ -228,8 +225,6 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test if (taskManagerProcess3 != null) { taskManagerProcess3.destroy(); } - - clusterEntrypoint.shutDownAsync(ApplicationStatus.SUCCEEDED, null, true).get(); } }
