This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 20447b23848c2f69afef2742e06922fe174ddc47
Author: Till Rohrmann <[email protected]>
AuthorDate: Thu Sep 27 17:14:00 2018 +0200

    [hotfix] Let ClusterEntrypoint implement AutoCloseableAsync
---
 .../apache/flink/runtime/entrypoint/ClusterEntrypoint.java  | 13 +++++++++++--
 .../AbstractTaskManagerProcessFailureRecoveryTest.java      |  7 +------
 2 files changed, 12 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 5665500..9eaef34 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -54,6 +54,7 @@ import 
org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
@@ -88,7 +89,7 @@ import scala.concurrent.duration.FiniteDuration;
  *
  * <p>Specialization of this class can be used for the session mode and the 
per-job mode
  */
-public abstract class ClusterEntrypoint implements FatalErrorHandler {
+public abstract class ClusterEntrypoint implements AutoCloseableAsync, 
FatalErrorHandler {
 
        public static final ConfigOption<String> EXECUTION_MODE = ConfigOptions
                .key("internal.cluster.execution-mode")
@@ -312,6 +313,14 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                return new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
        }
 
+       @Override
+       public CompletableFuture<Void> closeAsync() {
+               return shutDownAsync(
+                       ApplicationStatus.UNKNOWN,
+                       "Cluster entrypoint has been closed externally.",
+                       true).thenAccept(ignored -> {});
+       }
+
        protected CompletableFuture<Void> stopClusterServices(boolean 
cleanupHaData) {
                synchronized (lock) {
                        Throwable exception = null;
@@ -392,7 +401,7 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                return resultConfiguration;
        }
 
-       public CompletableFuture<ApplicationStatus> shutDownAsync(
+       private CompletableFuture<ApplicationStatus> shutDownAsync(
                        ApplicationStatus applicationStatus,
                        @Nullable String diagnostics,
                        boolean cleanupHaData) {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 1374b70..5d7f26b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
@@ -101,9 +100,7 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 
10000L);
                jmConfig.setInteger(RestOptions.PORT, restPort);
 
-               final StandaloneSessionClusterEntrypoint clusterEntrypoint = 
new StandaloneSessionClusterEntrypoint(jmConfig);
-
-               try {
+               try (final StandaloneSessionClusterEntrypoint clusterEntrypoint 
= new StandaloneSessionClusterEntrypoint(jmConfig)) {
                        // check that we run this test only if the java command
                        // is available on this machine
                        String javaCommand = getJavaCommandPath();
@@ -228,8 +225,6 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                        if (taskManagerProcess3 != null) {
                                taskManagerProcess3.destroy();
                        }
-
-                       
clusterEntrypoint.shutDownAsync(ApplicationStatus.SUCCEEDED, null, true).get();
                }
        }
 

Reply via email to