This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e85bb74aab3 [FLINK-37016] ClusterEntrypoint can be closed before 
initialization
e85bb74aab3 is described below

commit e85bb74aab37d2bc8252e38c617fd0bc00f230f6
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Mon Jan 6 14:38:24 2025 +0100

    [FLINK-37016] ClusterEntrypoint can be closed before initialization
---
 .../org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java    | 4 +++-
 .../apache/flink/runtime/entrypoint/ClusterEntrypointTest.java    | 8 ++++++++
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 122db8d639f..932438843d6 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -592,7 +592,9 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
                             shutDownApplicationFuture, () -> 
stopClusterServices(cleanupHaData));
 
             final CompletableFuture<Void> rpcSystemClassLoaderCloseFuture =
-                    FutureUtils.runAfterwards(serviceShutdownFuture, 
rpcSystem::close);
+                    rpcSystem != null
+                            ? FutureUtils.runAfterwards(serviceShutdownFuture, 
rpcSystem::close)
+                            : FutureUtils.completedVoidFuture();
 
             final CompletableFuture<Void> cleanupDirectoriesFuture =
                     FutureUtils.runAfterwards(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
index 8b2424e8694..86796fdeba5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
@@ -69,6 +69,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertFalse;
@@ -131,6 +132,13 @@ public class ClusterEntrypointTest extends TestLogger {
                 
ExceptionThrowingDelegationTokenReceiver.onNewTokensObtainedCallCount.get(), 
is(1));
     }
 
+    @Test
+    public void testCloseAsyncDoesNotFailBeforeInitialization() {
+        TestingEntryPoint entryPoint = new TestingEntryPoint.Builder().build();
+
+        assertThatCode(() -> 
entryPoint.closeAsync().join()).doesNotThrowAnyException();
+    }
+
     @Test
     public void testCloseAsyncShouldNotCleanUpHAData() throws Exception {
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();

Reply via email to