This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 905e8ef730c [improve][meta] Make session notification to be async. 
(#19869)
905e8ef730c is described below

commit 905e8ef730c96cbb43ad5c892f1dbfc3cd1521cc
Author: Qiang Zhao <[email protected]>
AuthorDate: Tue Mar 21 21:31:22 2023 +0800

    [improve][meta] Make session notification to be async. (#19869)
---
 .../org/apache/pulsar/common/util/FutureUtil.java  | 26 ++++++++++++++++++++++
 .../coordination/impl/LeaderElectionImpl.java      | 23 ++++++++++---------
 .../coordination/impl/LockManagerImpl.java         | 22 +++++++++---------
 3 files changed, 48 insertions(+), 23 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 531769a1e45..2b082b4a789 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -36,6 +37,7 @@ import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.ThreadSafe;
 
 /**
@@ -238,6 +240,30 @@ public class FutureUtil {
         return future;
     }
 
+    /**
+     * @throws RejectedExecutionException if this task cannot be accepted for 
execution
+     * @throws NullPointerException if one of params is null
+     */
+    public static <T> @Nonnull CompletableFuture<T> 
composeAsync(Supplier<CompletableFuture<T>> futureSupplier,
+                                                                 Executor 
executor) {
+        Objects.requireNonNull(futureSupplier);
+        Objects.requireNonNull(executor);
+        final CompletableFuture<T> future = new CompletableFuture<>();
+        try {
+            executor.execute(() -> futureSupplier.get().whenComplete((result, 
error) -> {
+                if (error != null) {
+                    future.completeExceptionally(error);
+                    return;
+                }
+                future.complete(result);
+            }));
+        } catch (RejectedExecutionException ex) {
+            future.completeExceptionally(ex);
+        }
+        return future;
+    }
+
+
     /**
      * Creates a low-overhead timeout exception which is performance optimized 
to minimize allocations
      * and cpu consumption. It sets the stacktrace of the exception to the 
given source class and
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
index c1121b1309c..11ae62226e7 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
@@ -24,7 +24,6 @@ import java.util.EnumSet;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -32,6 +31,7 @@ import java.util.function.Consumer;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.util.SafeRunnable;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataCacheConfig;
@@ -62,6 +62,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
     private Optional<T> proposedValue;
 
     private final ScheduledExecutorService executor;
+    private final FutureUtil.Sequencer<Void> sequencer;
 
     private enum InternalState {
         Init, ElectionInProgress, LeaderIsPresent, Closed
@@ -85,7 +86,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
         this.internalState = InternalState.Init;
         this.stateChangesListener = stateChangesListener;
         this.executor = executor;
-
+        this.sequencer = FutureUtil.Sequencer.create();
         store.registerListener(this::handlePathNotification);
         store.registerSessionListener(this::handleSessionNotification);
         updateCachedValueFuture = 
executor.scheduleWithFixedDelay(SafeRunnable.safeRun(this::getLeaderValue),
@@ -277,18 +278,18 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
 
     private void handleSessionNotification(SessionEvent event) {
         // Ensure we're only processing one session event at a time.
-        executor.execute(SafeRunnable.safeRun(() -> {
+        sequencer.sequential(() -> FutureUtil.composeAsync(() -> {
             if (event == SessionEvent.SessionReestablished) {
                 log.info("Revalidating leadership for {}", path);
-
-                try {
-                    LeaderElectionState les = elect().get();
-                    log.info("Resynced leadership for {} - State: {}", path, 
les);
-                } catch (ExecutionException | InterruptedException e) {
-                    log.warn("Failure when processing session event", e);
-                }
+                return elect().thenAccept(leaderState -> {
+                    log.info("Resynced leadership for {} - State: {}", path, 
leaderState);
+                }).exceptionally(ex -> {
+                    log.warn("Failure when processing session event", ex);
+                    return null;
+                });
             }
-        }));
+            return CompletableFuture.completedFuture(null);
+        }, executor));
     }
 
     private void handlePathNotification(Notification notification) {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
index f5e7e0528bd..4da6b7998a0 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
@@ -27,11 +27,9 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.common.util.SafeRunnable;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataSerde;
@@ -53,6 +51,7 @@ class LockManagerImpl<T> implements LockManager<T> {
     private final MetadataStoreExtended store;
     private final MetadataCache<T> cache;
     private final MetadataSerde<T> serde;
+    private final FutureUtil.Sequencer<Void> sequencer;
     private final ExecutorService executor;
 
     private enum State {
@@ -72,6 +71,7 @@ class LockManagerImpl<T> implements LockManager<T> {
         this.cache = store.getMetadataCache(serde);
         this.serde = serde;
         this.executor = executor;
+        this.sequencer = FutureUtil.Sequencer.create();
         store.registerSessionListener(this::handleSessionEvent);
         store.registerListener(this::handleDataNotification);
     }
@@ -118,9 +118,8 @@ class LockManagerImpl<T> implements LockManager<T> {
     private void handleSessionEvent(SessionEvent se) {
         // We want to make sure we're processing one event at a time and that 
we're done with one event before going
         // for the next one.
-        executor.execute(SafeRunnable.safeRun(() -> {
-            List<CompletableFuture<Void>> futures = new ArrayList<>();
-
+        sequencer.sequential(() -> FutureUtil.composeAsync(() -> {
+            final List<CompletableFuture<Void>> futures = new ArrayList<>();
             if (se == SessionEvent.SessionReestablished) {
                 log.info("Metadata store session has been re-established. 
Revalidating all the existing locks.");
                 for (ResourceLockImpl<T> lock : locks.values()) {
@@ -133,13 +132,12 @@ class LockManagerImpl<T> implements LockManager<T> {
                     futures.add(lock.revalidateIfNeededAfterReconnection());
                 }
             }
-
-            try {
-                FutureUtil.waitForAll(futures).get();
-            } catch (ExecutionException | InterruptedException e) {
-                log.warn("Failure when processing session event", e);
-            }
-        }));
+            return FutureUtil.waitForAll(futures)
+                    .exceptionally(ex -> {
+                        log.warn("Failure when processing session event", ex);
+                        return null;
+                    });
+        }, executor));
     }
 
     private void handleDataNotification(Notification n) {

Reply via email to