tillrohrmann commented on a change in pull request #15396:
URL: https://github.com/apache/flink/pull/15396#discussion_r609656062



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
##########
@@ -498,19 +508,28 @@ private Configuration 
generateClusterConfiguration(Configuration configuration)
     }
 
     /**
-     * Deregister the Flink application from the resource management system by 
signalling the {@link
-     * ResourceManager}.
+     * Close cluster components and deregister the Flink application from the 
resource management
+     * system by signalling the {@link ResourceManager}.
      *
      * @param applicationStatus to terminate the application with
+     * @param shutdownBehaviour shutdown behaviour
      * @param diagnostics additional information about the shut down, can be 
{@code null}
      * @return Future which is completed once the shut down
      */
     private CompletableFuture<Void> closeClusterComponent(
-            ApplicationStatus applicationStatus, @Nullable String diagnostics) 
{
+            ApplicationStatus applicationStatus,
+            ShutdownBehaviour shutdownBehaviour,
+            @Nullable String diagnostics) {
         synchronized (lock) {
             if (clusterComponent != null) {
-                return clusterComponent.deregisterApplicationAndClose(
-                        applicationStatus, diagnostics);
+                switch (shutdownBehaviour) {
+                    case STOP_APPLICATION:
+                        return clusterComponent.deregisterApplicationAndClose(

Review comment:
       Maybe we could call this method `stopApplication`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
##########
@@ -18,54 +18,419 @@
 
 package org.apache.flink.runtime.entrypoint;
 
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
+import org.apache.flink.runtime.dispatcher.runner.DispatcherRunner;
+import org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerFactory;
+import org.apache.flink.runtime.dispatcher.runner.TestingDispatcherRunner;
+import 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
 import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
+import 
org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
+import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import 
org.apache.flink.runtime.resourcemanager.ArbitraryWorkerResourceSpecFactory;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.TestingJobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.TestingResourceManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
+import org.apache.flink.runtime.rest.SessionRestEndpointFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.TestJvmProcess;
+import org.apache.flink.runtime.testutils.TestingClusterEntrypointProcess;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
 
+import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
+import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 
 /** Tests for the {@link ClusterEntrypoint}. */
-public class ClusterEntrypointTest {
+public class ClusterEntrypointTest extends TestLogger {
+
+    private static final long TIMEOUT_MS = 3000;
+
+    private Configuration flinkConfig;
+
+    @ClassRule
+    public static final TestExecutorResource<?> TEST_EXECUTOR_RESOURCE =
+            new TestExecutorResource<>(Executors::newSingleThreadExecutor);
+
+    @Before
+    public void before() {
+        flinkConfig = new Configuration();
+    }
 
     @Test(expected = IllegalConfigurationException.class)
     public void testStandaloneSessionClusterEntrypointDeniedInReactiveMode() {
-        Configuration configuration = new Configuration();
-        configuration.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
-        new MockEntryPoint(configuration);
+        flinkConfig.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
+        new TestingEntryPoint.Builder().setConfiguration(flinkConfig).build();
         fail("Entrypoint initialization is supposed to fail");
     }
 
-    private static class MockEntryPoint extends ClusterEntrypoint {
+    @Test
+    public void testCloseAsyncShouldNotCleanUpHAData() throws Exception {
+        final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+        final CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+        final HighAvailabilityServices testingHaService =
+                new TestingHighAvailabilityServicesBuilder()
+                        .setCloseFuture(closeFuture)
+                        
.setCloseAndCleanupAllDataFuture(closeAndCleanupAllDataFuture)
+                        .build();
+        final TestingEntryPoint testingEntryPoint =
+                new TestingEntryPoint.Builder()
+                        .setConfiguration(flinkConfig)
+                        .setHighAvailabilityServices(testingHaService)
+                        .build();
+
+        final CompletableFuture<ApplicationStatus> appStatusFuture =
+                startClusterEntrypoint(testingEntryPoint);
+
+        testingEntryPoint.closeAsync();
+        assertThat(
+                appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+                is(ApplicationStatus.UNKNOWN));
+        assertThat(closeFuture.isDone(), is(true));
+        assertThat(closeAndCleanupAllDataFuture.isDone(), is(false));
+    }
+
+    @Test
+    public void testCloseAsyncShouldNotDeregisterApp() throws Exception {
+        final CompletableFuture<Void> deregisterFuture = new 
CompletableFuture<>();
+        final TestingResourceManagerFactory testingResourceManagerFactory =
+                new TestingResourceManagerFactory.Builder()
+                        .setInternalDeregisterApplicationConsumer(
+                                (ignored1, ignored2) -> 
deregisterFuture.complete(null))
+                        .build();
+        final TestingEntryPoint testingEntryPoint =
+                new TestingEntryPoint.Builder()
+                        .setConfiguration(flinkConfig)
+                        
.setResourceManagerFactory(testingResourceManagerFactory)
+                        .build();
+
+        final CompletableFuture<ApplicationStatus> appStatusFuture =
+                startClusterEntrypoint(testingEntryPoint);
+
+        testingEntryPoint.closeAsync();
+        assertThat(
+                appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+                is(ApplicationStatus.UNKNOWN));
+        assertThat(deregisterFuture.isDone(), is(false));
+    }
+
+    @Test
+    public void 
testClusterFinishedNormallyShouldDeregisterAppAndCleanupHAData() throws 
Exception {
+        final CompletableFuture<Void> deregisterFuture = new 
CompletableFuture<>();
+        final CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+        final CompletableFuture<ApplicationStatus> dispatcherShutDownFuture =
+                new CompletableFuture<>();
+
+        final HighAvailabilityServices testingHaService =
+                new TestingHighAvailabilityServicesBuilder()
+                        
.setCloseAndCleanupAllDataFuture(closeAndCleanupAllDataFuture)
+                        .build();
+        final TestingResourceManagerFactory testingResourceManagerFactory =
+                new TestingResourceManagerFactory.Builder()
+                        .setInternalDeregisterApplicationConsumer(
+                                (ignored1, ignored2) -> 
deregisterFuture.complete(null))
+                        .build();
+        final TestingDispatcherRunnerFactory testingDispatcherRunnerFactory =
+                new TestingDispatcherRunnerFactory.Builder()
+                        .setShutDownFuture(dispatcherShutDownFuture)
+                        .build();
+
+        final TestingEntryPoint testingEntryPoint =
+                new TestingEntryPoint.Builder()
+                        .setConfiguration(flinkConfig)
+                        
.setResourceManagerFactory(testingResourceManagerFactory)
+                        
.setDispatcherRunnerFactory(testingDispatcherRunnerFactory)
+                        .setHighAvailabilityServices(testingHaService)
+                        .build();
+
+        final CompletableFuture<ApplicationStatus> appStatusFuture =
+                startClusterEntrypoint(testingEntryPoint);
+
+        dispatcherShutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+
+        assertThat(
+                appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+                is(ApplicationStatus.SUCCEEDED));
+        assertThat(deregisterFuture.isDone(), is(true));
+        assertThat(closeAndCleanupAllDataFuture.isDone(), is(true));
+    }
+
+    @Test
+    public void testCloseAsyncShouldBeExecutedInShutdownHook() throws 
Exception {
+        assumeTrue(OperatingSystem.isLinux() || OperatingSystem.isMac());
+        final File markerFile =
+                new File(
+                        EnvironmentInformation.getTemporaryFileDirectory(),
+                        UUID.randomUUID() + ".marker");

Review comment:
       Let's use `@ClassRule TemporaryFolder` for this.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
##########
@@ -18,54 +18,419 @@
 
 package org.apache.flink.runtime.entrypoint;
 
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
+import org.apache.flink.runtime.dispatcher.runner.DispatcherRunner;
+import org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerFactory;
+import org.apache.flink.runtime.dispatcher.runner.TestingDispatcherRunner;
+import 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
 import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
+import 
org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
+import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import 
org.apache.flink.runtime.resourcemanager.ArbitraryWorkerResourceSpecFactory;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.TestingJobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.TestingResourceManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
+import org.apache.flink.runtime.rest.SessionRestEndpointFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.TestJvmProcess;
+import org.apache.flink.runtime.testutils.TestingClusterEntrypointProcess;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
 
+import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
+import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 
 /** Tests for the {@link ClusterEntrypoint}. */
-public class ClusterEntrypointTest {
+public class ClusterEntrypointTest extends TestLogger {
+
+    private static final long TIMEOUT_MS = 3000;
+
+    private Configuration flinkConfig;
+
+    @ClassRule
+    public static final TestExecutorResource<?> TEST_EXECUTOR_RESOURCE =
+            new TestExecutorResource<>(Executors::newSingleThreadExecutor);
+
+    @Before
+    public void before() {
+        flinkConfig = new Configuration();
+    }
 
     @Test(expected = IllegalConfigurationException.class)
     public void testStandaloneSessionClusterEntrypointDeniedInReactiveMode() {
-        Configuration configuration = new Configuration();
-        configuration.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
-        new MockEntryPoint(configuration);
+        flinkConfig.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
+        new TestingEntryPoint.Builder().setConfiguration(flinkConfig).build();
         fail("Entrypoint initialization is supposed to fail");
     }
 
-    private static class MockEntryPoint extends ClusterEntrypoint {
+    @Test
+    public void testCloseAsyncShouldNotCleanUpHAData() throws Exception {
+        final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+        final CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+        final HighAvailabilityServices testingHaService =
+                new TestingHighAvailabilityServicesBuilder()
+                        .setCloseFuture(closeFuture)
+                        
.setCloseAndCleanupAllDataFuture(closeAndCleanupAllDataFuture)
+                        .build();
+        final TestingEntryPoint testingEntryPoint =
+                new TestingEntryPoint.Builder()
+                        .setConfiguration(flinkConfig)
+                        .setHighAvailabilityServices(testingHaService)
+                        .build();
+
+        final CompletableFuture<ApplicationStatus> appStatusFuture =
+                startClusterEntrypoint(testingEntryPoint);
+
+        testingEntryPoint.closeAsync();
+        assertThat(
+                appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+                is(ApplicationStatus.UNKNOWN));
+        assertThat(closeFuture.isDone(), is(true));
+        assertThat(closeAndCleanupAllDataFuture.isDone(), is(false));
+    }
+
+    @Test
+    public void testCloseAsyncShouldNotDeregisterApp() throws Exception {
+        final CompletableFuture<Void> deregisterFuture = new 
CompletableFuture<>();
+        final TestingResourceManagerFactory testingResourceManagerFactory =
+                new TestingResourceManagerFactory.Builder()
+                        .setInternalDeregisterApplicationConsumer(
+                                (ignored1, ignored2) -> 
deregisterFuture.complete(null))
+                        .build();
+        final TestingEntryPoint testingEntryPoint =
+                new TestingEntryPoint.Builder()
+                        .setConfiguration(flinkConfig)
+                        
.setResourceManagerFactory(testingResourceManagerFactory)
+                        .build();
+
+        final CompletableFuture<ApplicationStatus> appStatusFuture =
+                startClusterEntrypoint(testingEntryPoint);
+
+        testingEntryPoint.closeAsync();
+        assertThat(
+                appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+                is(ApplicationStatus.UNKNOWN));
+        assertThat(deregisterFuture.isDone(), is(false));
+    }
+
+    @Test
+    public void 
testClusterFinishedNormallyShouldDeregisterAppAndCleanupHAData() throws 
Exception {
+        final CompletableFuture<Void> deregisterFuture = new 
CompletableFuture<>();
+        final CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+        final CompletableFuture<ApplicationStatus> dispatcherShutDownFuture =
+                new CompletableFuture<>();
+
+        final HighAvailabilityServices testingHaService =
+                new TestingHighAvailabilityServicesBuilder()
+                        
.setCloseAndCleanupAllDataFuture(closeAndCleanupAllDataFuture)
+                        .build();
+        final TestingResourceManagerFactory testingResourceManagerFactory =
+                new TestingResourceManagerFactory.Builder()
+                        .setInternalDeregisterApplicationConsumer(
+                                (ignored1, ignored2) -> 
deregisterFuture.complete(null))
+                        .build();
+        final TestingDispatcherRunnerFactory testingDispatcherRunnerFactory =
+                new TestingDispatcherRunnerFactory.Builder()
+                        .setShutDownFuture(dispatcherShutDownFuture)
+                        .build();
+
+        final TestingEntryPoint testingEntryPoint =
+                new TestingEntryPoint.Builder()
+                        .setConfiguration(flinkConfig)
+                        
.setResourceManagerFactory(testingResourceManagerFactory)
+                        
.setDispatcherRunnerFactory(testingDispatcherRunnerFactory)
+                        .setHighAvailabilityServices(testingHaService)
+                        .build();
+
+        final CompletableFuture<ApplicationStatus> appStatusFuture =
+                startClusterEntrypoint(testingEntryPoint);
+
+        dispatcherShutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+
+        assertThat(
+                appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+                is(ApplicationStatus.SUCCEEDED));
+        assertThat(deregisterFuture.isDone(), is(true));
+        assertThat(closeAndCleanupAllDataFuture.isDone(), is(true));
+    }
+
+    @Test
+    public void testCloseAsyncShouldBeExecutedInShutdownHook() throws 
Exception {
+        assumeTrue(OperatingSystem.isLinux() || OperatingSystem.isMac());

Review comment:
       Let's add a short message stating why it only works on Linux and Mac and 
not Windows.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
##########
@@ -18,54 +18,419 @@
 
 package org.apache.flink.runtime.entrypoint;
 
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
+import org.apache.flink.runtime.dispatcher.runner.DispatcherRunner;
+import org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerFactory;
+import org.apache.flink.runtime.dispatcher.runner.TestingDispatcherRunner;
+import 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
 import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
+import 
org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
+import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import 
org.apache.flink.runtime.resourcemanager.ArbitraryWorkerResourceSpecFactory;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.TestingJobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.TestingResourceManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
+import org.apache.flink.runtime.rest.SessionRestEndpointFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.TestJvmProcess;
+import org.apache.flink.runtime.testutils.TestingClusterEntrypointProcess;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
 
+import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
+import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 
 /** Tests for the {@link ClusterEntrypoint}. */
-public class ClusterEntrypointTest {
+public class ClusterEntrypointTest extends TestLogger {
+
+    private static final long TIMEOUT_MS = 3000;
+
+    private Configuration flinkConfig;
+
+    @ClassRule
+    public static final TestExecutorResource<?> TEST_EXECUTOR_RESOURCE =
+            new TestExecutorResource<>(Executors::newSingleThreadExecutor);
+
+    @Before
+    public void before() {
+        flinkConfig = new Configuration();
+    }
 
     @Test(expected = IllegalConfigurationException.class)
     public void testStandaloneSessionClusterEntrypointDeniedInReactiveMode() {
-        Configuration configuration = new Configuration();
-        configuration.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
-        new MockEntryPoint(configuration);
+        flinkConfig.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
+        new TestingEntryPoint.Builder().setConfiguration(flinkConfig).build();
         fail("Entrypoint initialization is supposed to fail");
     }
 
-    private static class MockEntryPoint extends ClusterEntrypoint {
+    @Test
+    public void testCloseAsyncShouldNotCleanUpHAData() throws Exception {
+        final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+        final CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+        final HighAvailabilityServices testingHaService =
+                new TestingHighAvailabilityServicesBuilder()
+                        .setCloseFuture(closeFuture)
+                        
.setCloseAndCleanupAllDataFuture(closeAndCleanupAllDataFuture)
+                        .build();
+        final TestingEntryPoint testingEntryPoint =
+                new TestingEntryPoint.Builder()
+                        .setConfiguration(flinkConfig)
+                        .setHighAvailabilityServices(testingHaService)
+                        .build();
+
+        final CompletableFuture<ApplicationStatus> appStatusFuture =
+                startClusterEntrypoint(testingEntryPoint);
+
+        testingEntryPoint.closeAsync();
+        assertThat(
+                appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+                is(ApplicationStatus.UNKNOWN));
+        assertThat(closeFuture.isDone(), is(true));
+        assertThat(closeAndCleanupAllDataFuture.isDone(), is(false));
+    }
+
+    @Test
+    public void testCloseAsyncShouldNotDeregisterApp() throws Exception {
+        final CompletableFuture<Void> deregisterFuture = new 
CompletableFuture<>();
+        final TestingResourceManagerFactory testingResourceManagerFactory =
+                new TestingResourceManagerFactory.Builder()
+                        .setInternalDeregisterApplicationConsumer(
+                                (ignored1, ignored2) -> 
deregisterFuture.complete(null))
+                        .build();
+        final TestingEntryPoint testingEntryPoint =
+                new TestingEntryPoint.Builder()
+                        .setConfiguration(flinkConfig)
+                        
.setResourceManagerFactory(testingResourceManagerFactory)
+                        .build();
+
+        final CompletableFuture<ApplicationStatus> appStatusFuture =
+                startClusterEntrypoint(testingEntryPoint);
+
+        testingEntryPoint.closeAsync();
+        assertThat(
+                appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+                is(ApplicationStatus.UNKNOWN));
+        assertThat(deregisterFuture.isDone(), is(false));
+    }
+
+    @Test
+    public void 
testClusterFinishedNormallyShouldDeregisterAppAndCleanupHAData() throws 
Exception {
+        final CompletableFuture<Void> deregisterFuture = new 
CompletableFuture<>();
+        final CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+        final CompletableFuture<ApplicationStatus> dispatcherShutDownFuture =
+                new CompletableFuture<>();
+
+        final HighAvailabilityServices testingHaService =
+                new TestingHighAvailabilityServicesBuilder()
+                        
.setCloseAndCleanupAllDataFuture(closeAndCleanupAllDataFuture)
+                        .build();
+        final TestingResourceManagerFactory testingResourceManagerFactory =
+                new TestingResourceManagerFactory.Builder()
+                        .setInternalDeregisterApplicationConsumer(
+                                (ignored1, ignored2) -> 
deregisterFuture.complete(null))
+                        .build();
+        final TestingDispatcherRunnerFactory testingDispatcherRunnerFactory =
+                new TestingDispatcherRunnerFactory.Builder()
+                        .setShutDownFuture(dispatcherShutDownFuture)
+                        .build();
+
+        final TestingEntryPoint testingEntryPoint =
+                new TestingEntryPoint.Builder()
+                        .setConfiguration(flinkConfig)
+                        
.setResourceManagerFactory(testingResourceManagerFactory)
+                        
.setDispatcherRunnerFactory(testingDispatcherRunnerFactory)
+                        .setHighAvailabilityServices(testingHaService)
+                        .build();
+
+        final CompletableFuture<ApplicationStatus> appStatusFuture =
+                startClusterEntrypoint(testingEntryPoint);
+
+        dispatcherShutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+
+        assertThat(
+                appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+                is(ApplicationStatus.SUCCEEDED));
+        assertThat(deregisterFuture.isDone(), is(true));
+        assertThat(closeAndCleanupAllDataFuture.isDone(), is(true));
+    }
+
+    @Test
+    public void testCloseAsyncShouldBeExecutedInShutdownHook() throws 
Exception {
+        assumeTrue(OperatingSystem.isLinux() || OperatingSystem.isMac());

Review comment:
       I assume because of `Runtime().exec("kill ...")`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
##########
@@ -18,54 +18,419 @@
 
 package org.apache.flink.runtime.entrypoint;
 
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
+import org.apache.flink.runtime.dispatcher.runner.DispatcherRunner;
+import org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerFactory;
+import org.apache.flink.runtime.dispatcher.runner.TestingDispatcherRunner;
+import 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
 import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
+import 
org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
+import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import 
org.apache.flink.runtime.resourcemanager.ArbitraryWorkerResourceSpecFactory;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.TestingJobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.TestingResourceManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
+import org.apache.flink.runtime.rest.SessionRestEndpointFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.TestJvmProcess;
+import org.apache.flink.runtime.testutils.TestingClusterEntrypointProcess;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
 
+import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
+import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 
 /** Tests for the {@link ClusterEntrypoint}. */
-public class ClusterEntrypointTest {
+public class ClusterEntrypointTest extends TestLogger {
+
+    private static final long TIMEOUT_MS = 3000;
+
+    private Configuration flinkConfig;
+
+    @ClassRule
+    public static final TestExecutorResource<?> TEST_EXECUTOR_RESOURCE =
+            new TestExecutorResource<>(Executors::newSingleThreadExecutor);
+
+    @Before
+    public void before() {
+        flinkConfig = new Configuration();
+    }
 
     @Test(expected = IllegalConfigurationException.class)
     public void testStandaloneSessionClusterEntrypointDeniedInReactiveMode() {
-        Configuration configuration = new Configuration();
-        configuration.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
-        new MockEntryPoint(configuration);
+        flinkConfig.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
+        new TestingEntryPoint.Builder().setConfiguration(flinkConfig).build();
         fail("Entrypoint initialization is supposed to fail");
     }
 
-    private static class MockEntryPoint extends ClusterEntrypoint {
+    @Test
+    public void testCloseAsyncShouldNotCleanUpHAData() throws Exception {
+        final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+        final CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+        final HighAvailabilityServices testingHaService =
+                new TestingHighAvailabilityServicesBuilder()
+                        .setCloseFuture(closeFuture)
+                        
.setCloseAndCleanupAllDataFuture(closeAndCleanupAllDataFuture)
+                        .build();
+        final TestingEntryPoint testingEntryPoint =
+                new TestingEntryPoint.Builder()
+                        .setConfiguration(flinkConfig)
+                        .setHighAvailabilityServices(testingHaService)
+                        .build();
+
+        final CompletableFuture<ApplicationStatus> appStatusFuture =
+                startClusterEntrypoint(testingEntryPoint);
+
+        testingEntryPoint.closeAsync();
+        assertThat(
+                appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+                is(ApplicationStatus.UNKNOWN));
+        assertThat(closeFuture.isDone(), is(true));
+        assertThat(closeAndCleanupAllDataFuture.isDone(), is(false));
+    }
+
+    @Test
+    public void testCloseAsyncShouldNotDeregisterApp() throws Exception {
+        final CompletableFuture<Void> deregisterFuture = new 
CompletableFuture<>();
+        final TestingResourceManagerFactory testingResourceManagerFactory =
+                new TestingResourceManagerFactory.Builder()
+                        .setInternalDeregisterApplicationConsumer(
+                                (ignored1, ignored2) -> 
deregisterFuture.complete(null))
+                        .build();
+        final TestingEntryPoint testingEntryPoint =
+                new TestingEntryPoint.Builder()
+                        .setConfiguration(flinkConfig)
+                        
.setResourceManagerFactory(testingResourceManagerFactory)
+                        .build();
+
+        final CompletableFuture<ApplicationStatus> appStatusFuture =
+                startClusterEntrypoint(testingEntryPoint);
+
+        testingEntryPoint.closeAsync();
+        assertThat(
+                appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+                is(ApplicationStatus.UNKNOWN));
+        assertThat(deregisterFuture.isDone(), is(false));
+    }
+
+    @Test
+    public void 
testClusterFinishedNormallyShouldDeregisterAppAndCleanupHAData() throws 
Exception {
+        final CompletableFuture<Void> deregisterFuture = new 
CompletableFuture<>();
+        final CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+        final CompletableFuture<ApplicationStatus> dispatcherShutDownFuture =
+                new CompletableFuture<>();
+
+        final HighAvailabilityServices testingHaService =
+                new TestingHighAvailabilityServicesBuilder()
+                        
.setCloseAndCleanupAllDataFuture(closeAndCleanupAllDataFuture)
+                        .build();
+        final TestingResourceManagerFactory testingResourceManagerFactory =
+                new TestingResourceManagerFactory.Builder()
+                        .setInternalDeregisterApplicationConsumer(
+                                (ignored1, ignored2) -> 
deregisterFuture.complete(null))
+                        .build();
+        final TestingDispatcherRunnerFactory testingDispatcherRunnerFactory =
+                new TestingDispatcherRunnerFactory.Builder()
+                        .setShutDownFuture(dispatcherShutDownFuture)
+                        .build();
+
+        final TestingEntryPoint testingEntryPoint =
+                new TestingEntryPoint.Builder()
+                        .setConfiguration(flinkConfig)
+                        
.setResourceManagerFactory(testingResourceManagerFactory)
+                        
.setDispatcherRunnerFactory(testingDispatcherRunnerFactory)
+                        .setHighAvailabilityServices(testingHaService)
+                        .build();
+
+        final CompletableFuture<ApplicationStatus> appStatusFuture =
+                startClusterEntrypoint(testingEntryPoint);
+
+        dispatcherShutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+
+        assertThat(
+                appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+                is(ApplicationStatus.SUCCEEDED));
+        assertThat(deregisterFuture.isDone(), is(true));
+        assertThat(closeAndCleanupAllDataFuture.isDone(), is(true));
+    }
+
+    @Test
+    public void testCloseAsyncShouldBeExecutedInShutdownHook() throws 
Exception {
+        assumeTrue(OperatingSystem.isLinux() || OperatingSystem.isMac());
+        final File markerFile =
+                new File(
+                        EnvironmentInformation.getTemporaryFileDirectory(),
+                        UUID.randomUUID() + ".marker");
+        final TestingClusterEntrypointProcess clusterEntrypointProcess =
+                new TestingClusterEntrypointProcess(markerFile);
+        try {
+            clusterEntrypointProcess.startProcess();
+            final long pid = clusterEntrypointProcess.getProcessId();
+            assertTrue("Cannot determine process ID", pid != -1);
+
+            // wait for the marker file to appear, which means the process is 
up properly
+            TestJvmProcess.waitForMarkerFile(markerFile, 30000);
+
+            TestJvmProcess.killProcessWithSigTerm(pid);
+
+            CommonTestUtils.waitUntilCondition(
+                    () -> !markerFile.exists(),
+                    Deadline.fromNow(Duration.ofSeconds(3)),
+                    "markerFile should be deleted in closeAsync shutdownHook");
+        } finally {
+            clusterEntrypointProcess.destroy();
+            //noinspection ResultOfMethodCallIgnored
+            markerFile.delete();

Review comment:
       With `TemporaryFolder` this should no longer be necessary.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingClusterEntrypointProcess.java
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.util.SignalHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A testing {@link ClusterEntrypoint} instance running in a separate JVM. */
+public class TestingClusterEntrypointProcess extends TestJvmProcess {
+
+    private final File markerFile;
+
+    public TestingClusterEntrypointProcess(File markerFile) throws Exception {
+        this.markerFile = checkNotNull(markerFile, "marker file");
+    }
+
+    @Override
+    public String getName() {
+        return getClass().getCanonicalName();
+    }
+
+    @Override
+    public String[] getJvmArgs() {
+        return new String[] {markerFile.getAbsolutePath()};
+    }
+
+    @Override
+    public String getEntryPointClassName() {
+        return TestingClusterEntrypointProcessEntryPoint.class.getName();
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getCanonicalName();
+    }
+
+    /** Entrypoint for the testing cluster entrypoint process. */
+    public static class TestingClusterEntrypointProcessEntryPoint {
+
+        private static final Logger LOG =
+                
LoggerFactory.getLogger(TestingClusterEntrypointProcessEntryPoint.class);
+
+        public static void main(String[] args) {
+            try {
+                final File markerFile = new File(args[0]);
+
+                final Configuration config = new Configuration();
+                config.setInteger(JobManagerOptions.PORT, 0);
+                config.setString(RestOptions.BIND_PORT, "0");
+
+                final TestingClusterEntrypoint clusterEntrypoint =
+                        new TestingClusterEntrypoint(config, markerFile);
+
+                SignalHandler.register(LOG);
+                clusterEntrypoint.startCluster();
+                TestJvmProcess.touchFile(markerFile);
+                final int returnCode =
+                        
clusterEntrypoint.getTerminationFuture().get().processExitCode();
+                System.exit(returnCode);
+            } catch (Throwable t) {
+                LOG.error("Failed to start TestingClusterEntrypoint process", 
t);
+                System.exit(1);
+            }
+        }
+    }
+
+    private static class TestingClusterEntrypoint extends ClusterEntrypoint {
+
+        private final File markerFile;
+
+        protected TestingClusterEntrypoint(Configuration configuration, File 
markerFile) {
+            super(configuration);
+            this.markerFile = markerFile;
+        }
+
+        @Override
+        protected DispatcherResourceManagerComponentFactory
+                createDispatcherResourceManagerComponentFactory(Configuration 
configuration)
+                        throws IOException {
+            return 
DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(
+                    StandaloneResourceManagerFactory.getInstance());
+        }
+
+        @Override
+        protected ExecutionGraphInfoStore 
createSerializableExecutionGraphStore(
+                Configuration configuration, ScheduledExecutor 
scheduledExecutor)
+                throws IOException {
+            return new MemoryExecutionGraphInfoStore();
+        }
+
+        @Override
+        public CompletableFuture<Void> closeAsync() {
+            LOG.info("Deleting markerFile {}", markerFile);
+            //noinspection ResultOfMethodCallIgnored
+            markerFile.delete();

Review comment:
       I would suggest to not suppress the inspection but instead assert that 
the file could be deleted.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
##########
@@ -498,19 +508,28 @@ private Configuration 
generateClusterConfiguration(Configuration configuration)
     }
 
     /**
-     * Deregister the Flink application from the resource management system by 
signalling the {@link
-     * ResourceManager}.
+     * Close cluster components and deregister the Flink application from the 
resource management
+     * system by signalling the {@link ResourceManager}.
      *
      * @param applicationStatus to terminate the application with
+     * @param shutdownBehaviour shutdown behaviour
      * @param diagnostics additional information about the shut down, can be 
{@code null}
      * @return Future which is completed once the shut down
      */
     private CompletableFuture<Void> closeClusterComponent(
-            ApplicationStatus applicationStatus, @Nullable String diagnostics) 
{
+            ApplicationStatus applicationStatus,
+            ShutdownBehaviour shutdownBehaviour,
+            @Nullable String diagnostics) {
         synchronized (lock) {
             if (clusterComponent != null) {
-                return clusterComponent.deregisterApplicationAndClose(
-                        applicationStatus, diagnostics);
+                switch (shutdownBehaviour) {
+                    case STOP_APPLICATION:
+                        return clusterComponent.deregisterApplicationAndClose(
+                                applicationStatus, diagnostics);
+                    case STOP_PROCESS:
+                    default:
+                        return 
clusterComponent.closeAsyncWithoutDeregisteringApplication();

Review comment:
       And this method `stopProcess`. The JavaDocs of these two methods could 
briefly explain what's their difference.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingClusterEntrypointProcess.java
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.util.SignalHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A testing {@link ClusterEntrypoint} instance running in a separate JVM. */
+public class TestingClusterEntrypointProcess extends TestJvmProcess {
+
+    private final File markerFile;
+
+    public TestingClusterEntrypointProcess(File markerFile) throws Exception {
+        this.markerFile = checkNotNull(markerFile, "marker file");
+    }
+
+    @Override
+    public String getName() {
+        return getClass().getCanonicalName();
+    }
+
+    @Override
+    public String[] getJvmArgs() {
+        return new String[] {markerFile.getAbsolutePath()};
+    }
+
+    @Override
+    public String getEntryPointClassName() {
+        return TestingClusterEntrypointProcessEntryPoint.class.getName();
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getCanonicalName();
+    }
+
+    /** Entrypoint for the testing cluster entrypoint process. */
+    public static class TestingClusterEntrypointProcessEntryPoint {
+
+        private static final Logger LOG =
+                
LoggerFactory.getLogger(TestingClusterEntrypointProcessEntryPoint.class);
+
+        public static void main(String[] args) {
+            try {
+                final File markerFile = new File(args[0]);
+
+                final Configuration config = new Configuration();
+                config.setInteger(JobManagerOptions.PORT, 0);
+                config.setString(RestOptions.BIND_PORT, "0");
+
+                final TestingClusterEntrypoint clusterEntrypoint =
+                        new TestingClusterEntrypoint(config, markerFile);
+
+                SignalHandler.register(LOG);
+                clusterEntrypoint.startCluster();
+                TestJvmProcess.touchFile(markerFile);
+                final int returnCode =
+                        
clusterEntrypoint.getTerminationFuture().get().processExitCode();
+                System.exit(returnCode);
+            } catch (Throwable t) {
+                LOG.error("Failed to start TestingClusterEntrypoint process", 
t);
+                System.exit(1);
+            }
+        }
+    }
+
+    private static class TestingClusterEntrypoint extends ClusterEntrypoint {
+
+        private final File markerFile;
+
+        protected TestingClusterEntrypoint(Configuration configuration, File 
markerFile) {
+            super(configuration);
+            this.markerFile = markerFile;
+        }
+
+        @Override
+        protected DispatcherResourceManagerComponentFactory
+                createDispatcherResourceManagerComponentFactory(Configuration 
configuration)
+                        throws IOException {
+            return 
DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(
+                    StandaloneResourceManagerFactory.getInstance());
+        }
+
+        @Override
+        protected ExecutionGraphInfoStore 
createSerializableExecutionGraphStore(
+                Configuration configuration, ScheduledExecutor 
scheduledExecutor)
+                throws IOException {
+            return new MemoryExecutionGraphInfoStore();
+        }
+
+        @Override
+        public CompletableFuture<Void> closeAsync() {
+            LOG.info("Deleting markerFile {}", markerFile);
+            //noinspection ResultOfMethodCallIgnored
+            markerFile.delete();
+            return super.closeAsync();

Review comment:
       I think we need to do something like
   
   ```
   final CompletableFuture<Void> deleteFileFuture =
                       CompletableFuture.runAsync(
                               ThrowingRunnable.unchecked(
                                       () -> {
                                           Thread.sleep(50L);
                                           
IOUtils.deleteFileQuietly(markerFile.toPath());
                                       }));
   
               return FutureUtils.composeAfterwards(deleteFileFuture, 
super::closeAsync);
   ```
   
   in order to test that we actually wait on the completion of the returned 
future. Otherwise the deletion will be executed synchronously.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to