wangyang0918 commented on a change in pull request #15396:
URL: https://github.com/apache/flink/pull/15396#discussion_r610308548
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
##########
@@ -18,54 +18,418 @@
package org.apache.flink.runtime.entrypoint;
+import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
+import org.apache.flink.runtime.dispatcher.runner.DispatcherRunner;
+import org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerFactory;
+import org.apache.flink.runtime.dispatcher.runner.TestingDispatcherRunner;
+import
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
+import
org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
+import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import
org.apache.flink.runtime.resourcemanager.ArbitraryWorkerResourceSpecFactory;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.TestingJobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.TestingResourceManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
+import org.apache.flink.runtime.rest.SessionRestEndpointFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.TestJvmProcess;
+import org.apache.flink.runtime.testutils.TestingClusterEntrypointProcess;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.junit.Before;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import javax.annotation.Nullable;
+
+import java.io.File;
import java.io.IOException;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
/** Tests for the {@link ClusterEntrypoint}. */
-public class ClusterEntrypointTest {
+public class ClusterEntrypointTest extends TestLogger {
+
+ private static final long TIMEOUT_MS = 3000;
+
+ private Configuration flinkConfig;
+
+ @ClassRule
+ public static final TestExecutorResource<?> TEST_EXECUTOR_RESOURCE =
+ new TestExecutorResource<>(Executors::newSingleThreadExecutor);
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ @Before
+ public void before() {
+ flinkConfig = new Configuration();
+ }
@Test(expected = IllegalConfigurationException.class)
public void testStandaloneSessionClusterEntrypointDeniedInReactiveMode() {
- Configuration configuration = new Configuration();
- configuration.set(JobManagerOptions.SCHEDULER_MODE,
SchedulerExecutionMode.REACTIVE);
- new MockEntryPoint(configuration);
+ flinkConfig.set(JobManagerOptions.SCHEDULER_MODE,
SchedulerExecutionMode.REACTIVE);
+ new TestingEntryPoint.Builder().setConfiguration(flinkConfig).build();
fail("Entrypoint initialization is supposed to fail");
}
- private static class MockEntryPoint extends ClusterEntrypoint {
+ @Test
+ public void testCloseAsyncShouldNotCleanUpHAData() throws Exception {
+ final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+ final CompletableFuture<Void> closeAndCleanupAllDataFuture = new
CompletableFuture<>();
+ final HighAvailabilityServices testingHaService =
+ new TestingHighAvailabilityServicesBuilder()
+ .setCloseFuture(closeFuture)
+
.setCloseAndCleanupAllDataFuture(closeAndCleanupAllDataFuture)
+ .build();
+ final TestingEntryPoint testingEntryPoint =
+ new TestingEntryPoint.Builder()
+ .setConfiguration(flinkConfig)
+ .setHighAvailabilityServices(testingHaService)
+ .build();
+
+ final CompletableFuture<ApplicationStatus> appStatusFuture =
+ startClusterEntrypoint(testingEntryPoint);
+
+ testingEntryPoint.closeAsync();
+ assertThat(
+ appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+ is(ApplicationStatus.UNKNOWN));
+ assertThat(closeFuture.isDone(), is(true));
+ assertThat(closeAndCleanupAllDataFuture.isDone(), is(false));
+ }
+
+ @Test
+ public void testCloseAsyncShouldNotDeregisterApp() throws Exception {
+ final CompletableFuture<Void> deregisterFuture = new
CompletableFuture<>();
+ final TestingResourceManagerFactory testingResourceManagerFactory =
+ new TestingResourceManagerFactory.Builder()
+ .setInternalDeregisterApplicationConsumer(
+ (ignored1, ignored2) ->
deregisterFuture.complete(null))
+ .build();
+ final TestingEntryPoint testingEntryPoint =
+ new TestingEntryPoint.Builder()
+ .setConfiguration(flinkConfig)
+
.setResourceManagerFactory(testingResourceManagerFactory)
+ .build();
+
+ final CompletableFuture<ApplicationStatus> appStatusFuture =
+ startClusterEntrypoint(testingEntryPoint);
+
+ testingEntryPoint.closeAsync();
+ assertThat(
+ appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+ is(ApplicationStatus.UNKNOWN));
+ assertThat(deregisterFuture.isDone(), is(false));
+ }
+
+ @Test
+ public void
testClusterFinishedNormallyShouldDeregisterAppAndCleanupHAData() throws
Exception {
+ final CompletableFuture<Void> deregisterFuture = new
CompletableFuture<>();
+ final CompletableFuture<Void> closeAndCleanupAllDataFuture = new
CompletableFuture<>();
+ final CompletableFuture<ApplicationStatus> dispatcherShutDownFuture =
+ new CompletableFuture<>();
+
+ final HighAvailabilityServices testingHaService =
+ new TestingHighAvailabilityServicesBuilder()
+
.setCloseAndCleanupAllDataFuture(closeAndCleanupAllDataFuture)
+ .build();
+ final TestingResourceManagerFactory testingResourceManagerFactory =
+ new TestingResourceManagerFactory.Builder()
+ .setInternalDeregisterApplicationConsumer(
+ (ignored1, ignored2) ->
deregisterFuture.complete(null))
+ .build();
+ final TestingDispatcherRunnerFactory testingDispatcherRunnerFactory =
+ new TestingDispatcherRunnerFactory.Builder()
+ .setShutDownFuture(dispatcherShutDownFuture)
+ .build();
+
+ final TestingEntryPoint testingEntryPoint =
+ new TestingEntryPoint.Builder()
+ .setConfiguration(flinkConfig)
+
.setResourceManagerFactory(testingResourceManagerFactory)
+
.setDispatcherRunnerFactory(testingDispatcherRunnerFactory)
+ .setHighAvailabilityServices(testingHaService)
+ .build();
+
+ final CompletableFuture<ApplicationStatus> appStatusFuture =
+ startClusterEntrypoint(testingEntryPoint);
+
+ dispatcherShutDownFuture.complete(ApplicationStatus.SUCCEEDED);
- protected MockEntryPoint(Configuration configuration) {
+ assertThat(
+ appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+ is(ApplicationStatus.SUCCEEDED));
+ assertThat(deregisterFuture.isDone(), is(true));
+ assertThat(closeAndCleanupAllDataFuture.isDone(), is(true));
+ }
+
+ @Test
+ public void testCloseAsyncShouldBeExecutedInShutdownHook() throws
Exception {
+ // This test only works on Linux and Mac OS because we are sending
SIGTERM to a
+ // JAVA process via "kill {pid}"
+ assumeTrue(OperatingSystem.isLinux() || OperatingSystem.isMac());
+ final File markerFile = new File(TEMPORARY_FOLDER.getRoot(),
UUID.randomUUID() + ".marker");
+ final TestingClusterEntrypointProcess clusterEntrypointProcess =
+ new TestingClusterEntrypointProcess(markerFile);
+ try {
+ clusterEntrypointProcess.startProcess();
+ final long pid = clusterEntrypointProcess.getProcessId();
+ assertTrue("Cannot determine process ID", pid != -1);
+
+ // wait for the marker file to appear, which means the process is
up properly
+ TestJvmProcess.waitForMarkerFile(markerFile, 30000);
+
+ TestJvmProcess.killProcessWithSigTerm(pid);
+
+ CommonTestUtils.waitUntilCondition(
+ () -> !markerFile.exists(),
+ Deadline.fromNow(Duration.ofSeconds(3)),
+ "markerFile should be deleted in closeAsync shutdownHook");
Review comment:
I am aware of `clusterEntrypointProcess.waitFor()`, but I am not pretty
sure about whether it is reliable to depend on the `Process#waitFor`. So I am
using the `CommonTestUtils.waitUntilCondition` before.
I have changed to `clusterEntrypointProcess.waitFor()` and run more than 100
times locally. It always passes. I will update this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]