This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 92b7cdc7f32 [fix][test] Fix flaky
MetadataStoreTest.testThreadSwitchOfZkMetadataStore (#25347)
92b7cdc7f32 is described below
commit 92b7cdc7f328afc4f262f66f465a07be96690c5b
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Mar 18 12:13:05 2026 -0700
[fix][test] Fix flaky MetadataStoreTest.testThreadSwitchOfZkMetadataStore
(#25347)
---
.../metadata/impl/AbstractMetadataStore.java | 1 +
.../apache/pulsar/metadata/MetadataStoreTest.java | 34 +++++++++++++---------
2 files changed, 22 insertions(+), 13 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 50e53038f3e..e71c6ff2753 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -84,6 +84,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
private final CopyOnWriteArrayList<Consumer<SessionEvent>>
sessionListeners = new CopyOnWriteArrayList<>();
protected final String metadataStoreName;
private final OrderedExecutor serDesExecutor;
+ @Getter
private final ExecutorService eventExecutor;
private final ScheduledExecutorService schedulerExecutor;
private final AsyncLoadingCache<String, List<String>> childrenCache;
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index 061bccad931..259261b18c6 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -40,6 +40,7 @@ import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -59,6 +60,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.apache.pulsar.metadata.impl.DualMetadataStore;
import org.apache.pulsar.metadata.impl.PulsarZooKeeperClient;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -473,8 +475,10 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
MetadataStoreConfig config = builder.build();
@Cleanup
DualMetadataStore store = (DualMetadataStore)
MetadataStoreFactory.create(zks.getConnectionString(), config);
- ZooKeeper zkClient = ((ZKMetadataStore)
store.getSourceStore()).getZkClient();
+ AbstractMetadataStore sourceStore = (AbstractMetadataStore)
store.getSourceStore();
+ ZooKeeper zkClient = ((ZKMetadataStore) sourceStore).getZkClient();
assertTrue(zkClient.getClientConfig().isSaslClientEnabled());
+ ExecutorService executor = sourceStore.getEventExecutor();
final Runnable verify = () -> {
String currentThreadName = Thread.currentThread().getName();
String errorMessage = String.format("Expect to switch to thread
%s, but currently it is thread %s",
@@ -482,36 +486,40 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
assertTrue(Thread.currentThread().getName().startsWith(metadataStoreName),
errorMessage);
};
+ // Use thenApplyAsync to ensure the callback is scheduled on the
store's executor.
+ // thenApply on an already-completed future runs synchronously on the
calling thread,
+ // which would cause a false failure.
+
// put with node which has parent(but the parent node is not exists).
- store.put(prefix + "/a1/b1/c1", "value".getBytes(),
Optional.of(-1L)).thenApply((ignore) -> {
+ store.put(prefix + "/a1/b1/c1", "value".getBytes(),
Optional.of(-1L)).thenApplyAsync((ignore) -> {
verify.run();
return null;
- }).join();
+ }, executor).join();
// put.
- store.put(prefix + "/b1", "value".getBytes(),
Optional.of(-1L)).thenApply((ignore) -> {
+ store.put(prefix + "/b1", "value".getBytes(),
Optional.of(-1L)).thenApplyAsync((ignore) -> {
verify.run();
return null;
- }).join();
+ }, executor).join();
// get.
- store.get(prefix + "/b1").thenApply((ignore) -> {
+ store.get(prefix + "/b1").thenApplyAsync((ignore) -> {
verify.run();
return null;
- }).join();
+ }, executor).join();
// get the node which is not exists.
- store.get(prefix + "/non").thenApply((ignore) -> {
+ store.get(prefix + "/non").thenApplyAsync((ignore) -> {
verify.run();
return null;
- }).join();
+ }, executor).join();
// delete.
- store.delete(prefix + "/b1", Optional.empty()).thenApply((ignore) -> {
+ store.delete(prefix + "/b1", Optional.empty()).thenApplyAsync((ignore)
-> {
verify.run();
return null;
- }).join();
+ }, executor).join();
// delete the node which is not exists.
- store.delete(prefix + "/non", Optional.empty()).thenApply((ignore) -> {
+ store.delete(prefix + "/non",
Optional.empty()).thenApplyAsync((ignore) -> {
verify.run();
return null;
- }).exceptionally(ex -> {
+ }, executor).exceptionally(ex -> {
verify.run();
return null;
}).join();