This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new e85bb74aab3 [FLINK-37016] ClusterEntrypoint can be closed before initialization e85bb74aab3 is described below commit e85bb74aab37d2bc8252e38c617fd0bc00f230f6 Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Mon Jan 6 14:38:24 2025 +0100 [FLINK-37016] ClusterEntrypoint can be closed before initialization --- .../org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java | 4 +++- .../apache/flink/runtime/entrypoint/ClusterEntrypointTest.java | 8 ++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 122db8d639f..932438843d6 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -592,7 +592,9 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro shutDownApplicationFuture, () -> stopClusterServices(cleanupHaData)); final CompletableFuture<Void> rpcSystemClassLoaderCloseFuture = - FutureUtils.runAfterwards(serviceShutdownFuture, rpcSystem::close); + rpcSystem != null + ? FutureUtils.runAfterwards(serviceShutdownFuture, rpcSystem::close) + : FutureUtils.completedVoidFuture(); final CompletableFuture<Void> cleanupDirectoriesFuture = FutureUtils.runAfterwards( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java index 8b2424e8694..86796fdeba5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java @@ -69,6 +69,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertFalse; @@ -131,6 +132,13 @@ public class ClusterEntrypointTest extends TestLogger { ExceptionThrowingDelegationTokenReceiver.onNewTokensObtainedCallCount.get(), is(1)); } + @Test + public void testCloseAsyncDoesNotFailBeforeInitialization() { + TestingEntryPoint entryPoint = new TestingEntryPoint.Builder().build(); + + assertThatCode(() -> entryPoint.closeAsync().join()).doesNotThrowAnyException(); + } + @Test public void testCloseAsyncShouldNotCleanUpHAData() throws Exception { final CompletableFuture<Void> closeFuture = new CompletableFuture<>();