This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9ebfc3bbd1b176be745834fe2f9643b48799940d
Author: Oneby Wang <[email protected]>
AuthorDate: Wed Jun 3 05:14:48 2026 +0800

    [fix][meta] Fix ZooKeeper session reconnect race condition in 
PulsarZooKeeperClient.clientCreator (#25910)
    
    (cherry picked from commit 5627c01b1ef04b8424b781eabc4dea6963faf847)
---
 .../metadata/impl/PulsarZooKeeperClient.java       | 37 +++++++++++++++++++---
 .../pulsar/metadata/impl/ZKSessionWatcher.java     | 19 ++++++++++-
 2 files changed, 50 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
index 462df69b2ea..39921b0f89f 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
@@ -122,16 +123,42 @@ public class PulsarZooKeeperClient extends ZooKeeper 
implements Watcher, AutoClo
                         log.info("Reconnecting zookeeper {}.", connectString);
                         // close the previous one
                         closeZkHandle();
+
+                        // ZooKeeper can deliver SyncConnected after 
createZooKeeper() returns but before zk.set(newZk)
+                        // publishes the new instance. Hold these events until 
the new instance is published, so child
+                        // watchers never observe a new-session event while 
PulsarZooKeeperClient still points at the
+                        // old handle.
+                        CountDownLatch newZkSetLatch = new CountDownLatch(1);
+                        Watcher forwardEventsWatcher = event -> {
+                            try {
+                                boolean awaited = 
newZkSetLatch.await(sessionTimeoutMs, TimeUnit.MILLISECONDS);
+                                if (!awaited) {
+                                    log.warn("Timed out waiting for ZooKeeper 
instance to be published before "
+                                            + "forwarding event {}", event);
+                                }
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                log.warn("Interrupted while waiting for 
ZooKeeper instance to be published, event {}",
+                                        event, e);
+                                return;
+                            }
+                            watcherManager.process(event);
+                        };
+
                         ZooKeeper newZk;
                         try {
-                            newZk = createZooKeeper();
+                            newZk = createZooKeeper(forwardEventsWatcher);
                         } catch (IOException | 
QuorumPeerConfig.ConfigException e) {
                             log.error("Failed to create zookeeper instance to 
{} with config path {}",
                                     connectString, configPath, e);
                             throw 
KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
                         }
-                        waitForConnection();
+
+                        // Publish the new instance before releasing the 
forwarding watcher. waitForConnection() must
+                        // happen after countDown(), since it depends on the 
forwarded SyncConnected event.
                         zk.set(newZk);
+                        newZkSetLatch.countDown();
+                        waitForConnection();
                         log.info("ZooKeeper session {} is created to {}.",
                                 Long.toHexString(newZk.getSessionId()), 
connectString);
                         return newZk;
@@ -354,12 +381,12 @@ public class PulsarZooKeeperClient extends ZooKeeper 
implements Watcher, AutoClo
         watcherManager.waitForConnection();
     }
 
-    protected ZooKeeper createZooKeeper() throws IOException, 
QuorumPeerConfig.ConfigException {
+    protected ZooKeeper createZooKeeper(Watcher watcher) throws IOException, 
QuorumPeerConfig.ConfigException {
         if (null != configPath) {
-            return new ZooKeeper(connectString, sessionTimeoutMs, 
watcherManager, allowReadOnlyMode,
+            return new ZooKeeper(connectString, sessionTimeoutMs, watcher, 
allowReadOnlyMode,
                     new ZKClientConfig(configPath));
         }
-        return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager, 
allowReadOnlyMode);
+        return new ZooKeeper(connectString, sessionTimeoutMs, watcher, 
allowReadOnlyMode);
     }
 
     @Override
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
index a8407210230..cc26231a745 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
@@ -87,6 +87,7 @@ public class ZKSessionWatcher implements AutoCloseable, 
Watcher {
     // in the future.
     private void checkConnectionStatus() {
         try {
+            long checkedSessionId = zk.getSessionId();
             CompletableFuture<Watcher.Event.KeeperState> future = new 
CompletableFuture<>();
             zk.exists("/", false, (StatCallback) (rc, path, ctx, stat) -> {
                 switch (KeeperException.Code.get(rc)) {
@@ -112,7 +113,7 @@ public class ZKSessionWatcher implements AutoCloseable, 
Watcher {
                 zkClientState = Watcher.Event.KeeperState.Disconnected;
             }
 
-            checkState(zkClientState);
+            checkStateIfSameSession(checkedSessionId, zkClientState);
         } catch (RejectedExecutionException | InterruptedException e) {
             task.cancel(true);
         } catch (Throwable t) {
@@ -130,6 +131,22 @@ public class ZKSessionWatcher implements AutoCloseable, 
Watcher {
         currentStatus = SessionEvent.SessionLost;
     }
 
+    // PulsarZooKeeperClient publishes the new ZooKeeper instance before 
forwarding the corresponding session event to
+    // watcherManager, so zk.set(newZk) happens-before this watcher observes 
the new-session event. Keep the session-id
+    // check and state transition in the same synchronized section to prevent 
stale async probes from racing with that
+    // event and overwriting the state of the newly established session.
+    private synchronized void checkStateIfSameSession(long checkedSessionId,
+                                                      
Watcher.Event.KeeperState zkClientState) {
+        long currentSessionId = zk.getSessionId();
+        if (checkedSessionId != currentSessionId) {
+            log.warn("Ignoring ZooKeeper session state from a stale session. 
checkedSessionId: {},"
+                            + " currentSessionId: {}, zkClientState: {}",
+                    checkedSessionId, currentSessionId, zkClientState);
+            return;
+        }
+        checkState(zkClientState);
+    }
+
     private synchronized void checkState(Watcher.Event.KeeperState 
zkClientState) {
         switch (zkClientState) {
         case Expired:

Reply via email to