This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5627c01b1ef [fix][meta] Fix ZooKeeper session reconnect race condition
in PulsarZooKeeperClient.clientCreator (#25910)
5627c01b1ef is described below
commit 5627c01b1ef04b8424b781eabc4dea6963faf847
Author: Oneby Wang <[email protected]>
AuthorDate: Wed Jun 3 05:14:48 2026 +0800
[fix][meta] Fix ZooKeeper session reconnect race condition in
PulsarZooKeeperClient.clientCreator (#25910)
---
.../metadata/impl/PulsarZooKeeperClient.java | 40 +++++++++++++++++++---
.../pulsar/metadata/impl/ZKSessionWatcher.java | 21 +++++++++++-
2 files changed, 55 insertions(+), 6 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
index 2b39e1bf7ca..22ec0ba62fc 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
@@ -122,9 +123,34 @@ public class PulsarZooKeeperClient extends ZooKeeper
implements Watcher, AutoClo
log.info().attr("connectString",
connectString).log("Reconnecting zookeeper");
// close the previous one
closeZkHandle();
+
+ // ZooKeeper can deliver SyncConnected after
createZooKeeper() returns but before zk.set(newZk)
+ // publishes the new instance. Hold these events until
the new instance is published, so child
+ // watchers never observe a new-session event while
PulsarZooKeeperClient still points at the
+ // old handle.
+ CountDownLatch newZkSetLatch = new CountDownLatch(1);
+ Watcher forwardEventsWatcher = event -> {
+ try {
+ boolean awaited =
newZkSetLatch.await(sessionTimeoutMs, TimeUnit.MILLISECONDS);
+ if (!awaited) {
+ log.warn().attr("event", event)
+ .log("Timed out waiting for
ZooKeeper instance to be published before "
+ + "forwarding event");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn()
+ .attr("event", event)
+ .exception(e)
+ .log("Interrupted while waiting for
ZooKeeper instance to be published");
+ return;
+ }
+ watcherManager.process(event);
+ };
+
ZooKeeper newZk;
try {
- newZk = createZooKeeper();
+ newZk = createZooKeeper(forwardEventsWatcher);
} catch (IOException |
QuorumPeerConfig.ConfigException e) {
log.error()
.attr("connectString", connectString)
@@ -133,8 +159,12 @@ public class PulsarZooKeeperClient extends ZooKeeper
implements Watcher, AutoClo
.log("Failed to create zookeeper
instance");
throw
KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
}
- waitForConnection();
+
+ // Publish the new instance before releasing the
forwarding watcher. waitForConnection() must
+ // happen after countDown(), since it depends on the
forwarded SyncConnected event.
zk.set(newZk);
+ newZkSetLatch.countDown();
+ waitForConnection();
log.info()
.attr("sessionId",
Long.toHexString(newZk.getSessionId()))
.attr("connectString", connectString)
@@ -363,12 +393,12 @@ public class PulsarZooKeeperClient extends ZooKeeper
implements Watcher, AutoClo
}
@SuppressWarnings("deprecation")
- protected ZooKeeper createZooKeeper() throws IOException,
QuorumPeerConfig.ConfigException {
+ protected ZooKeeper createZooKeeper(Watcher watcher) throws IOException,
QuorumPeerConfig.ConfigException {
if (null != configPath) {
- return new ZooKeeper(connectString, sessionTimeoutMs,
watcherManager, allowReadOnlyMode,
+ return new ZooKeeper(connectString, sessionTimeoutMs, watcher,
allowReadOnlyMode,
new ZKClientConfig(configPath));
}
- return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager,
allowReadOnlyMode);
+ return new ZooKeeper(connectString, sessionTimeoutMs, watcher,
allowReadOnlyMode);
}
@Override
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
index e5726865d2c..08d87eb94b2 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
@@ -87,6 +87,7 @@ public class ZKSessionWatcher implements AutoCloseable,
Watcher {
// in the future.
private void checkConnectionStatus() {
try {
+ long checkedSessionId = zk.getSessionId();
CompletableFuture<Watcher.Event.KeeperState> future = new
CompletableFuture<>();
zk.exists("/", false, (StatCallback) (rc, path, ctx, stat) -> {
switch (KeeperException.Code.get(rc)) {
@@ -112,7 +113,7 @@ public class ZKSessionWatcher implements AutoCloseable,
Watcher {
zkClientState = Watcher.Event.KeeperState.Disconnected;
}
- checkState(zkClientState);
+ checkStateIfSameSession(checkedSessionId, zkClientState);
} catch (RejectedExecutionException | InterruptedException e) {
task.cancel(true);
} catch (Throwable t) {
@@ -130,6 +131,24 @@ public class ZKSessionWatcher implements AutoCloseable,
Watcher {
currentStatus = SessionEvent.SessionLost;
}
+ // PulsarZooKeeperClient publishes the new ZooKeeper instance before
forwarding the corresponding session event to
+ // watcherManager, so zk.set(newZk) happens-before this watcher observes
the new-session event. Keep the session-id
+ // check and state transition in the same synchronized section to prevent
stale async probes from racing with that
+ // event and overwriting the state of the newly established session.
+ private synchronized void checkStateIfSameSession(long checkedSessionId,
+
Watcher.Event.KeeperState zkClientState) {
+ long currentSessionId = zk.getSessionId();
+ if (checkedSessionId != currentSessionId) {
+ log.warn()
+ .attr("checkedSessionId", checkedSessionId)
+ .attr("currentSessionId", currentSessionId)
+ .attr("zkClientState", zkClientState)
+ .log("Ignoring ZooKeeper session state from a stale
session");
+ return;
+ }
+ checkState(zkClientState);
+ }
+
private synchronized void checkState(Watcher.Event.KeeperState
zkClientState) {
switch (zkClientState) {
case Expired: