This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7a9b5c7233d8ba7f85b6e45bc93168e8615e8d79
Author: zzb <[email protected]>
AuthorDate: Fri May 15 17:07:17 2026 +0800

    [fix][broker] Fix PulsarService.closeAsync where Condition.signalAll was 
called without holding a lock (#25777)
    
    Co-authored-by: zhaizhibo <[email protected]>
    (cherry picked from commit a20008899b7a89171ae44d40893eb353bb78ab70)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  9 +++++--
 .../pulsar/broker/PulsarServiceCloseTest.java      | 28 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index d643ed8b6fe..a77e09d79ca 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -747,8 +747,13 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 } else {
                     LOG.warn("Closed with errors", t);
                 }
-                state = State.Closed;
-                isClosedCondition.signalAll();
+                mutex.lock();
+                try {
+                    state = State.Closed;
+                    isClosedCondition.signalAll();
+                } finally {
+                    mutex.unlock();
+                }
                 return null;
             });
             return closeFuture;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java
index 683e15c2a02..8b3b8ad9110 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceCloseTest.java
@@ -18,9 +18,12 @@
  */
 package org.apache.pulsar.broker;
 
+import static org.testng.Assert.fail;
 import static org.testng.AssertJUnit.assertFalse;
 import static org.testng.AssertJUnit.assertTrue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -73,4 +76,29 @@ public class PulsarServiceCloseTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test(timeOut = 60_000)
+    public void testWaitUntilClosedConcurrentWithCloseAsync() throws Exception 
{
+        // Start closeAsync() - it initiates close and returns a future
+        CompletableFuture<Void> closeFuture = pulsar.closeAsync();
+
+        // Start waitUntilClosed() in a separate thread BEFORE close completes.
+        // This thread will enter mutex.lock() -> await() and block there,
+        // relying on signalAll() to be woken up when close finishes.
+        CompletableFuture<Void> waitFuture = CompletableFuture.runAsync(() -> {
+            try {
+                pulsar.waitUntilClosed();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
+            }
+        });
+
+        try {
+            closeFuture.get(30, TimeUnit.SECONDS);
+            waitFuture.get(30, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            fail("Should not throw exception");
+        }
+        log.info("waitUntilClosed() returned successfully while closeAsync() 
was in progress");
+    }
 }

Reply via email to