This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.19 by this push: new 1c6540bfa42 [FLINK-37016] ClusterEntrypoint can be closed before initialization 1c6540bfa42 is described below commit 1c6540bfa42b8824ea264d8b7104ef41f61bd5e1 Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Mon Jan 6 14:38:24 2025 +0100 [FLINK-37016] ClusterEntrypoint can be closed before initialization --- .../org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java | 4 +++- .../apache/flink/runtime/entrypoint/ClusterEntrypointTest.java | 8 ++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 8c2803fe443..28c66d0c21d 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -592,7 +592,9 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro shutDownApplicationFuture, () -> stopClusterServices(cleanupHaData)); final CompletableFuture<Void> rpcSystemClassLoaderCloseFuture = - FutureUtils.runAfterwards(serviceShutdownFuture, rpcSystem::close); + rpcSystem != null + ? FutureUtils.runAfterwards(serviceShutdownFuture, rpcSystem::close) + : FutureUtils.completedVoidFuture(); final CompletableFuture<Void> cleanupDirectoriesFuture = FutureUtils.runAfterwards( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java index 8b2424e8694..86796fdeba5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java @@ -69,6 +69,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertFalse; @@ -131,6 +132,13 @@ public class ClusterEntrypointTest extends TestLogger { ExceptionThrowingDelegationTokenReceiver.onNewTokensObtainedCallCount.get(), is(1)); } + @Test + public void testCloseAsyncDoesNotFailBeforeInitialization() { + TestingEntryPoint entryPoint = new TestingEntryPoint.Builder().build(); + + assertThatCode(() -> entryPoint.closeAsync().join()).doesNotThrowAnyException(); + } + @Test public void testCloseAsyncShouldNotCleanUpHAData() throws Exception { final CompletableFuture<Void> closeFuture = new CompletableFuture<>();