This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b121d854c0933309475891774b7a90911afb2c81 Author: Lari Hotari <[email protected]> AuthorDate: Wed Sep 14 11:24:57 2022 +0300 [fix][metadata] Don't execute Bookkeeper metadata callbacks on Zookeeper event thread (#17620) --- .../metadata/bookkeeper/PulsarRegistrationClient.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java index 38e2a33ef3f..52b50e3ea4b 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java @@ -21,12 +21,15 @@ package org.apache.pulsar.metadata.bookkeeper; import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE; import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY; +import io.netty.util.concurrent.DefaultThreadFactory; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import org.apache.bookkeeper.discover.RegistrationClient; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.versioning.Version; @@ -46,6 +49,7 @@ public class PulsarRegistrationClient implements RegistrationClient { private final Map<RegistrationListener, Boolean> writableBookiesWatchers = new ConcurrentHashMap<>(); private final Map<RegistrationListener, Boolean> readOnlyBookiesWatchers = new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor; public PulsarRegistrationClient(MetadataStore store, String ledgersRootPath) { @@ -60,11 +64,15 @@ public class PulsarRegistrationClient implements RegistrationClient { this.bookieAllRegistrationPath = ledgersRootPath + "/" + COOKIE_NODE; this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + READONLY; + this.executor = Executors + .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-registration-client")); + store.registerListener(this::updatedBookies); } @Override public void close() { + executor.shutdownNow(); } @Override @@ -99,7 +107,7 @@ public class PulsarRegistrationClient implements RegistrationClient { public CompletableFuture<Void> watchWritableBookies(RegistrationListener registrationListener) { writableBookiesWatchers.put(registrationListener, Boolean.TRUE); return getWritableBookies() - .thenAccept(registrationListener::onBookiesChanged); + .thenAcceptAsync(registrationListener::onBookiesChanged, executor); } @Override @@ -111,7 +119,7 @@ public class PulsarRegistrationClient implements RegistrationClient { public CompletableFuture<Void> watchReadOnlyBookies(RegistrationListener registrationListener) { readOnlyBookiesWatchers.put(registrationListener, Boolean.TRUE); return getReadOnlyBookies() - .thenAccept(registrationListener::onBookiesChanged); + .thenAcceptAsync(registrationListener::onBookiesChanged, executor); } @Override @@ -124,11 +132,11 @@ public class PulsarRegistrationClient implements RegistrationClient { if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) { getReadOnlyBookies().thenAccept(bookies -> readOnlyBookiesWatchers.keySet() - .forEach(w -> w.onBookiesChanged(bookies))); + .forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies)))); } else if (n.getPath().startsWith(bookieRegistrationPath)) { getWritableBookies().thenAccept(bookies -> writableBookiesWatchers.keySet() - .forEach(w -> w.onBookiesChanged(bookies))); + .forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies)))); } } }
