This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit eefb47514cd063255c67804d83a0e97a1be2964c Author: Oneby Wang <[email protected]> AuthorDate: Wed Jun 3 02:47:24 2026 +0800 [fix][meta] Fix PulsarZooKeeperClient async addWatch callback retry behavior (#25913) (cherry picked from commit be9f97ac0f833f2dc74dc0f4538e647f7376461f) --- .../metadata/impl/PulsarZooKeeperClient.java | 10 ++-- .../apache/pulsar/metadata/MetadataStoreTest.java | 58 ++++++++++++++++++++++ 2 files changed, 63 insertions(+), 5 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java index 6a995f20e74..462df69b2ea 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java @@ -1163,7 +1163,7 @@ public class PulsarZooKeeperClient extends ZooKeeper implements Watcher, AutoClo } @Override - public void addWatch(String basePath, Watcher watcher, AddWatchMode mode, VoidCallback cb, Object ctx) { + public void addWatch(String basePath, Watcher watcher, AddWatchMode mode, VoidCallback cb, Object context) { final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, setStats) { final VoidCallback vCb = new VoidCallback() { @@ -1174,7 +1174,7 @@ public class PulsarZooKeeperClient extends ZooKeeper implements Watcher, AutoClo if (allowRetry(worker, rc)) { backOffAndRetry(that, worker.nextRetryWaitTime()); } else { - vCb.processResult(rc, basePath, ctx); + cb.processResult(rc, path, context); } } @@ -1184,15 +1184,15 @@ public class PulsarZooKeeperClient extends ZooKeeper implements Watcher, AutoClo void zkRun() { ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { - PulsarZooKeeperClient.super.addWatch(basePath, watcher, mode, cb, ctx); + PulsarZooKeeperClient.super.addWatch(basePath, watcher, mode, vCb, worker); } else { - zkHandle.addWatch(basePath, watcher, mode, cb, ctx); + zkHandle.addWatch(basePath, watcher, mode, vCb, worker); } } @Override public String toString() { - return String.format("setData (%s, mode = %s)", basePath, mode.name()); + return String.format("addWatch (%s, mode = %s)", basePath, mode.name()); } }; // execute it immediately diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java index 9bd2ddd5e8f..14810b19692 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java @@ -19,6 +19,12 @@ package org.apache.pulsar.metadata; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -40,6 +46,7 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -48,6 +55,7 @@ import java.util.function.Supplier; import lombok.Cleanup; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataStore; @@ -62,6 +70,9 @@ import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.metadata.impl.PulsarZooKeeperClient; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.metadata.impl.oxia.OxiaMetadataStore; +import org.apache.zookeeper.AddWatchMode; +import org.apache.zookeeper.AsyncCallback.VoidCallback; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; @@ -538,6 +549,53 @@ public class MetadataStoreTest extends BaseMetadataStoreTest { assertFalse(zooKeeper.getClientConfig().isSaslClientEnabled()); } + @Test + @SuppressWarnings("unchecked") + public void testAsyncAddWatchRetriesWithWrapperCallback() throws Exception { + String path = newKey(); + @Cleanup + PulsarZooKeeperClient zkClient = PulsarZooKeeperClient.newBuilder() + .connectString(zks.getConnectionString()) + .sessionTimeoutMs(3000) + .operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(0, 0, 3)) + .build(); + + ZooKeeper mockZk = mock(ZooKeeper.class); + AtomicInteger attempts = new AtomicInteger(); + doAnswer(invocation -> { + // The wrapper callback should consume this recoverable failure and retry the addWatch operation. + int rc = attempts.incrementAndGet() == 1 + ? KeeperException.Code.CONNECTIONLOSS.intValue() + : KeeperException.Code.OK.intValue(); + String callbackPath = invocation.getArgument(0); + VoidCallback callback = invocation.getArgument(3); + Object callbackContext = invocation.getArgument(4); + callback.processResult(rc, callbackPath, callbackContext); + return null; + }).when(mockZk).addWatch(eq(path), any(Watcher.class), eq(AddWatchMode.PERSISTENT_RECURSIVE), + any(VoidCallback.class), any()); + + // Force the Pulsar wrapper to delegate the async addWatch call to our controlled ZooKeeper instance. + var zooKeeperRef = (AtomicReference<ZooKeeper>) WhiteboxImpl.getInternalState(zkClient, "zk"); + zooKeeperRef.set(mockZk); + + CountDownLatch callbackCalled = new CountDownLatch(1); + AtomicInteger callbackRc = new AtomicInteger(Integer.MIN_VALUE); + zkClient.addWatch(path, event -> { + }, AddWatchMode.PERSISTENT_RECURSIVE, (rc, callbackPath, ctx) -> { + callbackRc.set(rc); + callbackCalled.countDown(); + }, null); + + assertTrue(callbackCalled.await(5, TimeUnit.SECONDS)); + + // The caller should only see the final successful result after the retry, not the first CONNECTIONLOSS. + assertEquals(callbackRc.get(), KeeperException.Code.OK.intValue()); + assertEquals(attempts.get(), 2); + verify(mockZk, times(2)).addWatch(eq(path), any(Watcher.class), eq(AddWatchMode.PERSISTENT_RECURSIVE), + any(VoidCallback.class), any()); + } + @Test public void testOxiaLoadConfigFromFile() throws Exception { final String metadataStoreName = UUID.randomUUID().toString().replaceAll("-", "");
