This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.20 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push: new 28fedeaa3d8 [FLINK-37016] ClusterEntrypoint can be closed before initialization 28fedeaa3d8 is described below commit 28fedeaa3d854f34c63b019ab3d0da9079d0830a Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Mon Jan 6 14:38:24 2025 +0100 [FLINK-37016] ClusterEntrypoint can be closed before initialization --- .../org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java | 4 +++- .../apache/flink/runtime/entrypoint/ClusterEntrypointTest.java | 8 ++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 5dfeb4ab8b6..32fd515ed4b 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -594,7 +594,9 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro shutDownApplicationFuture, () -> stopClusterServices(cleanupHaData)); final CompletableFuture<Void> rpcSystemClassLoaderCloseFuture = - FutureUtils.runAfterwards(serviceShutdownFuture, rpcSystem::close); + rpcSystem != null + ? FutureUtils.runAfterwards(serviceShutdownFuture, rpcSystem::close) + : FutureUtils.completedVoidFuture(); final CompletableFuture<Void> cleanupDirectoriesFuture = FutureUtils.runAfterwards( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java index 8b2424e8694..86796fdeba5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java @@ -69,6 +69,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertFalse; @@ -131,6 +132,13 @@ public class ClusterEntrypointTest extends TestLogger { ExceptionThrowingDelegationTokenReceiver.onNewTokensObtainedCallCount.get(), is(1)); } + @Test + public void testCloseAsyncDoesNotFailBeforeInitialization() { + TestingEntryPoint entryPoint = new TestingEntryPoint.Builder().build(); + + assertThatCode(() -> entryPoint.closeAsync().join()).doesNotThrowAnyException(); + } + @Test public void testCloseAsyncShouldNotCleanUpHAData() throws Exception { final CompletableFuture<Void> closeFuture = new CompletableFuture<>();