tillrohrmann commented on a change in pull request #15396:
URL: https://github.com/apache/flink/pull/15396#discussion_r607786784
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
##########
@@ -22,50 +22,279 @@
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
+import
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
+import
org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import
org.apache.flink.runtime.resourcemanager.ArbitraryWorkerResourceSpecFactory;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.TestingJobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.TestingResourceManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.ConfigurationException;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import javax.annotation.Nullable;
+
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/** Tests for the {@link ClusterEntrypoint}. */
public class ClusterEntrypointTest {
Review comment:
I know it is unrelated but maybe we can still change it: `extends
TestLogger` is missing.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
##########
@@ -22,50 +22,279 @@
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
+import
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
+import
org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import
org.apache.flink.runtime.resourcemanager.ArbitraryWorkerResourceSpecFactory;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.TestingJobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.TestingResourceManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.ConfigurationException;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import javax.annotation.Nullable;
+
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/** Tests for the {@link ClusterEntrypoint}. */
public class ClusterEntrypointTest {
+ private static final long TIMEOUT_MS = 3000;
+
+ private Configuration flinkConfig;
+ private ExecutorService testingExecutor;
+
+ @Before
+ public void before() {
+ flinkConfig = new Configuration();
+ testingExecutor =
+ Executors.newSingleThreadExecutor(new
ExecutorThreadFactory("testing-executor"));
+ }
+
+ @After
+ public void tearDown() {
+ testingExecutor.shutdownNow();
+ }
+
@Test(expected = IllegalConfigurationException.class)
public void testStandaloneSessionClusterEntrypointDeniedInReactiveMode() {
- Configuration configuration = new Configuration();
- configuration.set(JobManagerOptions.SCHEDULER_MODE,
SchedulerExecutionMode.REACTIVE);
- new MockEntryPoint(configuration);
+ flinkConfig.set(JobManagerOptions.SCHEDULER_MODE,
SchedulerExecutionMode.REACTIVE);
+ new TestingEntryPoint.Builder().setConfiguration(flinkConfig).build();
fail("Entrypoint initialization is supposed to fail");
}
- private static class MockEntryPoint extends ClusterEntrypoint {
+ @Test
+ public void testCloseAsyncShouldNotCleanUpHAData() throws Exception {
Review comment:
Should we also add a test which ensures that the HA data is cleaned up
and we deregister the application if the `DispatcherResourceManagerComponent`
completes the `shutdownFuture` successfully?
##########
File path:
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
##########
@@ -599,14 +599,14 @@ public boolean accept(File dir, String name) {
}
public static boolean verifyStringsInNamedLogFiles(
- final String[] mustHave, final String fileName) {
- List<String> mustHaveList = Arrays.asList(mustHave);
- File cwd = new File("target/" +
YARN_CONFIGURATION.get(TEST_CLUSTER_NAME_KEY));
+ final String[] mustHave, final String applicationId, final String
fileName) {
Review comment:
I would suggest to pass in an `ApplicationID`. This gives a bit better
type safety.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
##########
@@ -22,50 +22,279 @@
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
+import
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
+import
org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import
org.apache.flink.runtime.resourcemanager.ArbitraryWorkerResourceSpecFactory;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.TestingJobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.TestingResourceManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.ConfigurationException;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import javax.annotation.Nullable;
+
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/** Tests for the {@link ClusterEntrypoint}. */
public class ClusterEntrypointTest {
+ private static final long TIMEOUT_MS = 3000;
+
+ private Configuration flinkConfig;
+ private ExecutorService testingExecutor;
+
+ @Before
+ public void before() {
+ flinkConfig = new Configuration();
+ testingExecutor =
+ Executors.newSingleThreadExecutor(new
ExecutorThreadFactory("testing-executor"));
Review comment:
You could use the `TestExecutorResource` here.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
##########
@@ -135,6 +135,22 @@ private void registerShutDownFuture() {
}
}
+ /**
+ * Close the web monitor and cluster components. This method will not
deregister the Flink
+ * application from the resource management.
+ *
+ * @return Future which is completed once the shut down
+ */
+ public CompletableFuture<Void> closeAsyncWithoutDeregisteringApplication()
{
+
+ if (isRunning.compareAndSet(true, false)) {
+ return FutureUtils.composeAfterwards(
+ webMonitorEndpoint.closeAsync(), this::closeAsyncInternal);
+ } else {
+ return terminationFuture;
+ }
+ }
Review comment:
I would suggest to not duplicate the shut down logic. Having a single
place where the shut down happens has the benefit that only one place needs to
be updated in case of a change. Maybe we can have an internal method which
takes an additional shutdown `Runnable` which in case of the deregister
application case calls `deregisterApplication()` and in the non deregister
application call is a no-op. What do you think?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
##########
@@ -22,50 +22,279 @@
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
+import
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
+import
org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import
org.apache.flink.runtime.resourcemanager.ArbitraryWorkerResourceSpecFactory;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.TestingJobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.TestingResourceManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.ConfigurationException;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import javax.annotation.Nullable;
+
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/** Tests for the {@link ClusterEntrypoint}. */
public class ClusterEntrypointTest {
+ private static final long TIMEOUT_MS = 3000;
+
+ private Configuration flinkConfig;
+ private ExecutorService testingExecutor;
+
+ @Before
+ public void before() {
+ flinkConfig = new Configuration();
+ testingExecutor =
+ Executors.newSingleThreadExecutor(new
ExecutorThreadFactory("testing-executor"));
+ }
+
+ @After
+ public void tearDown() {
+ testingExecutor.shutdownNow();
+ }
+
@Test(expected = IllegalConfigurationException.class)
public void testStandaloneSessionClusterEntrypointDeniedInReactiveMode() {
- Configuration configuration = new Configuration();
- configuration.set(JobManagerOptions.SCHEDULER_MODE,
SchedulerExecutionMode.REACTIVE);
- new MockEntryPoint(configuration);
+ flinkConfig.set(JobManagerOptions.SCHEDULER_MODE,
SchedulerExecutionMode.REACTIVE);
+ new TestingEntryPoint.Builder().setConfiguration(flinkConfig).build();
fail("Entrypoint initialization is supposed to fail");
}
- private static class MockEntryPoint extends ClusterEntrypoint {
+ @Test
+ public void testCloseAsyncShouldNotCleanUpHAData() throws Exception {
+ final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+ final CompletableFuture<Void> closeAndCleanupAllDataFuture = new
CompletableFuture<>();
+ final HighAvailabilityServices testingHaService =
+ new TestingHighAvailabilityServicesBuilder()
+ .setCloseFuture(closeFuture)
+
.setCloseAndCleanupAllDataFuture(closeAndCleanupAllDataFuture)
+ .build();
+ final TestingEntryPoint testingEntryPoint =
+ new TestingEntryPoint.Builder()
+ .setConfiguration(flinkConfig)
+ .setHighAvailabilityServices(testingHaService)
+ .build();
+
+ final CompletableFuture<ApplicationStatus> appStatusFuture =
+ startClusterEntrypoint(testingEntryPoint);
+
+ testingEntryPoint.closeAsync();
+ assertThat(
+ appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+ is(ApplicationStatus.UNKNOWN));
+ assertThat(closeFuture.isDone(), is(true));
+ assertThat(closeAndCleanupAllDataFuture.isDone(), is(false));
+ }
+
+ @Test
+ public void testCloseAsyncShouldNotDeregisterApp() throws Exception {
+ final CompletableFuture<Void> deregisterFuture = new
CompletableFuture<>();
+ final TestingResourceManagerFactory testingResourceManagerFactory =
+ new TestingResourceManagerFactory.Builder()
+ .setInternalDeregisterApplicationConsumer(
+ (ignored1, ignored2) ->
deregisterFuture.complete(null))
+ .build();
+ final TestingEntryPoint testingEntryPoint =
+ new TestingEntryPoint.Builder()
+ .setConfiguration(flinkConfig)
+
.setResourceManagerFactory(testingResourceManagerFactory)
+ .build();
+
+ final CompletableFuture<ApplicationStatus> appStatusFuture =
+ startClusterEntrypoint(testingEntryPoint);
- protected MockEntryPoint(Configuration configuration) {
+ testingEntryPoint.closeAsync();
+ assertThat(
+ appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+ is(ApplicationStatus.UNKNOWN));
+ assertThat(deregisterFuture.isDone(), is(false));
+ }
+
+ private CompletableFuture<ApplicationStatus> startClusterEntrypoint(
+ TestingEntryPoint testingEntryPoint) throws Exception {
+ testingEntryPoint.startCluster();
+ return FutureUtils.supplyAsync(
+ () -> testingEntryPoint.getTerminationFuture().get(),
testingExecutor);
+ }
+
+ private static class TestingEntryPoint extends ClusterEntrypoint {
+
+ private final HighAvailabilityServices haService;
+
+ private final ResourceManagerFactory<ResourceID>
resourceManagerFactory;
+
+ private TestingEntryPoint(
+ Configuration configuration,
+ HighAvailabilityServices haService,
+ ResourceManagerFactory<ResourceID> resourceManagerFactory) {
super(configuration);
+ SignalHandler.register(LOG);
+ this.haService = haService;
+ this.resourceManagerFactory = resourceManagerFactory;
}
@Override
protected DispatcherResourceManagerComponentFactory
createDispatcherResourceManagerComponentFactory(Configuration
configuration)
throws IOException {
- throw new UnsupportedOperationException("Not needed for this
test");
+ return
DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(
+ resourceManagerFactory);
}
@Override
protected ExecutionGraphInfoStore
createSerializableExecutionGraphStore(
Configuration configuration, ScheduledExecutor
scheduledExecutor)
throws IOException {
- throw new UnsupportedOperationException("Not needed for this
test");
+ return new MemoryExecutionGraphInfoStore();
+ }
+
+ @Override
+ protected HighAvailabilityServices createHaServices(
+ Configuration configuration, Executor executor) {
+ return haService;
}
@Override
protected boolean supportsReactiveMode() {
return false;
}
+
+ public static final class Builder {
+ private HighAvailabilityServices haService =
+ new TestingHighAvailabilityServicesBuilder().build();
+
+ private ResourceManagerFactory<ResourceID> resourceManagerFactory =
+ StandaloneResourceManagerFactory.getInstance();
+
+ private Configuration configuration = new Configuration();
+
+ public Builder
setHighAvailabilityServices(HighAvailabilityServices haService) {
+ this.haService = haService;
+ return this;
+ }
+
+ public Builder setResourceManagerFactory(
+ ResourceManagerFactory<ResourceID> resourceManagerFactory)
{
+ this.resourceManagerFactory = resourceManagerFactory;
+ return this;
+ }
+
+ public Builder setConfiguration(Configuration configuration) {
+ this.configuration = configuration;
+ return this;
+ }
+
+ public TestingEntryPoint build() {
+ return new TestingEntryPoint(configuration, haService,
resourceManagerFactory);
+ }
+ }
+ }
+
+ private static class TestingResourceManagerFactory extends
ResourceManagerFactory<ResourceID> {
+
+ private final BiConsumer<ApplicationStatus, String>
deregisterAppConsumer;
+
+ private TestingResourceManagerFactory(
+ BiConsumer<ApplicationStatus, String> deregisterAppConsumer) {
+ this.deregisterAppConsumer = deregisterAppConsumer;
+ }
+
+ @Override
+ protected ResourceManager<ResourceID> createResourceManager(
+ Configuration configuration,
+ ResourceID resourceId,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ FatalErrorHandler fatalErrorHandler,
+ ClusterInformation clusterInformation,
+ @Nullable String webInterfaceUrl,
+ ResourceManagerMetricGroup resourceManagerMetricGroup,
+ ResourceManagerRuntimeServices resourceManagerRuntimeServices,
+ Executor ioExecutor)
+ throws Exception {
+ final SlotManager slotManager =
+ SlotManagerBuilder.newBuilder()
+
.setScheduledExecutor(rpcService.getScheduledExecutor())
+ .build();
+ final JobLeaderIdService jobLeaderIdService =
+ new TestingJobLeaderIdService.Builder().build();
+ return new TestingResourceManager(
+ rpcService,
+ resourceId,
+ highAvailabilityServices,
+ heartbeatServices,
+ slotManager,
+ NoOpResourceManagerPartitionTracker::get,
+ jobLeaderIdService,
+ fatalErrorHandler,
+ resourceManagerMetricGroup) {
Review comment:
We don't have to change it now but creating a `TestingResourceManager`
is quite a heavy operation. It could be simplified by letting the
`DefaultDispatcherResourceManagerComponentFactory` not create a
`ResourceManger` but a `ResourceManagerService`. That way we could instantiate
a `TestingResourceManagerService` which returns a
`TestingResourceManagerGateway` on which we could register the
`deregisterAppConsumer`. The `DefaultDispatcherResourceManagerComponentFactory`
would then also have to use a `ResourceManagerServiceFactory` instead of a
`ResourceManagerFactory`. This could be a follow up issue.
##########
File path:
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
##########
@@ -615,10 +615,15 @@ public boolean accept(File dir, String name) {
if (fileName != null &&
!name.equals(fileName)) {
return false;
}
- File f = new File(dir.getAbsolutePath() + "/"
+ name);
+ final File f = new File(dir.getAbsolutePath(),
name);
+ // Only check the specified application logs
+ if
(!Arrays.asList(f.getAbsolutePath().split(File.separator))
+ .contains(applicationId)) {
+ return false;
+ }
Review comment:
Alternative proposal which does not involve so much string manipulation:
```
StreamSupport.stream(f.toPath().spliterator(), false)
.anyMatch(p ->
p.endsWith(applicationId));
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
##########
@@ -165,7 +165,7 @@ protected ClusterEntrypoint(Configuration configuration) {
shutDownHook =
ShutdownHookUtil.addShutdownHook(
- this::cleanupDirectories, getClass().getSimpleName(),
LOG);
+ () -> this.closeAsync().join(),
getClass().getSimpleName(), LOG);
Review comment:
I think it would be great to also test this bit here. One idea could be
to use the `TestProcessBuilder` to create a separate process. This process
should then be killed via `SIGTERM`. We would then need something which waits
for a little bit before doing the cleanup. The cleanup is ideally something
like deleting a file which can be verified from the other process. What do you
think?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
##########
@@ -615,4 +632,15 @@ public static void runClusterEntrypoint(ClusterEntrypoint
clusterEntrypoint) {
/** Directly stops after the job has finished. */
DETACHED
}
+
+ /** The shutdown reason for {@link #shutDownAsync}. */
+ private enum ShutdownReason {
+ /**
+ * Flink needs to shutdown the cluster internally. We will deregister
the application from
+ * resource management.
+ */
+ FLINK_INTERNAL,
+ /** The cluster entrypoint is killed externally. We will not
deregister the application. */
+ EXTERNAL
+ }
Review comment:
Rather than describing the shutdown reason I would recommend to describe
the cleanup behaviour. Maybe something like
```
enum ShutdownBehaviour {
STOP_APPLICATION,
STOP_PROCESS,
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]