This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 92b7cdc7f32 [fix][test] Fix flaky 
MetadataStoreTest.testThreadSwitchOfZkMetadataStore (#25347)
92b7cdc7f32 is described below

commit 92b7cdc7f328afc4f262f66f465a07be96690c5b
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Mar 18 12:13:05 2026 -0700

    [fix][test] Fix flaky MetadataStoreTest.testThreadSwitchOfZkMetadataStore 
(#25347)
---
 .../metadata/impl/AbstractMetadataStore.java       |  1 +
 .../apache/pulsar/metadata/MetadataStoreTest.java  | 34 +++++++++++++---------
 2 files changed, 22 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 50e53038f3e..e71c6ff2753 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -84,6 +84,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     private final CopyOnWriteArrayList<Consumer<SessionEvent>> 
sessionListeners = new CopyOnWriteArrayList<>();
     protected final String metadataStoreName;
     private final OrderedExecutor serDesExecutor;
+    @Getter
     private final ExecutorService eventExecutor;
     private final ScheduledExecutorService schedulerExecutor;
     private final AsyncLoadingCache<String, List<String>> childrenCache;
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index 061bccad931..259261b18c6 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -40,6 +40,7 @@ import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -59,6 +60,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreFactory;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
 import org.apache.pulsar.metadata.impl.DualMetadataStore;
 import org.apache.pulsar.metadata.impl.PulsarZooKeeperClient;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -473,8 +475,10 @@ public class MetadataStoreTest extends 
BaseMetadataStoreTest {
         MetadataStoreConfig config = builder.build();
         @Cleanup
         DualMetadataStore store = (DualMetadataStore) 
MetadataStoreFactory.create(zks.getConnectionString(), config);
-        ZooKeeper zkClient = ((ZKMetadataStore) 
store.getSourceStore()).getZkClient();
+        AbstractMetadataStore sourceStore = (AbstractMetadataStore) 
store.getSourceStore();
+        ZooKeeper zkClient = ((ZKMetadataStore) sourceStore).getZkClient();
         assertTrue(zkClient.getClientConfig().isSaslClientEnabled());
+        ExecutorService executor = sourceStore.getEventExecutor();
         final Runnable verify = () -> {
             String currentThreadName = Thread.currentThread().getName();
             String errorMessage = String.format("Expect to switch to thread 
%s, but currently it is thread %s",
@@ -482,36 +486,40 @@ public class MetadataStoreTest extends 
BaseMetadataStoreTest {
             
assertTrue(Thread.currentThread().getName().startsWith(metadataStoreName), 
errorMessage);
         };
 
+        // Use thenApplyAsync to ensure the callback is scheduled on the 
store's executor.
+        // thenApply on an already-completed future runs synchronously on the 
calling thread,
+        // which would cause a false failure.
+
         // put with node which has parent(but the parent node is not exists).
-        store.put(prefix + "/a1/b1/c1", "value".getBytes(), 
Optional.of(-1L)).thenApply((ignore) -> {
+        store.put(prefix + "/a1/b1/c1", "value".getBytes(), 
Optional.of(-1L)).thenApplyAsync((ignore) -> {
             verify.run();
             return null;
-        }).join();
+        }, executor).join();
         // put.
-        store.put(prefix + "/b1", "value".getBytes(), 
Optional.of(-1L)).thenApply((ignore) -> {
+        store.put(prefix + "/b1", "value".getBytes(), 
Optional.of(-1L)).thenApplyAsync((ignore) -> {
             verify.run();
             return null;
-        }).join();
+        }, executor).join();
         // get.
-        store.get(prefix + "/b1").thenApply((ignore) -> {
+        store.get(prefix + "/b1").thenApplyAsync((ignore) -> {
             verify.run();
             return null;
-        }).join();
+        }, executor).join();
         // get the node which is not exists.
-        store.get(prefix + "/non").thenApply((ignore) -> {
+        store.get(prefix + "/non").thenApplyAsync((ignore) -> {
             verify.run();
             return null;
-        }).join();
+        }, executor).join();
         // delete.
-        store.delete(prefix + "/b1", Optional.empty()).thenApply((ignore) -> {
+        store.delete(prefix + "/b1", Optional.empty()).thenApplyAsync((ignore) 
-> {
             verify.run();
             return null;
-        }).join();
+        }, executor).join();
         // delete the node which is not exists.
-        store.delete(prefix + "/non", Optional.empty()).thenApply((ignore) -> {
+        store.delete(prefix + "/non", 
Optional.empty()).thenApplyAsync((ignore) -> {
             verify.run();
             return null;
-        }).exceptionally(ex -> {
+        }, executor).exceptionally(ex -> {
             verify.run();
             return null;
         }).join();

Reply via email to