This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 905e8ef730c [improve][meta] Make session notification to be async.
(#19869)
905e8ef730c is described below
commit 905e8ef730c96cbb43ad5c892f1dbfc3cd1521cc
Author: Qiang Zhao <[email protected]>
AuthorDate: Tue Mar 21 21:31:22 2023 +0800
[improve][meta] Make session notification to be async. (#19869)
---
.../org/apache/pulsar/common/util/FutureUtil.java | 26 ++++++++++++++++++++++
.../coordination/impl/LeaderElectionImpl.java | 23 ++++++++++---------
.../coordination/impl/LockManagerImpl.java | 22 +++++++++---------
3 files changed, 48 insertions(+), 23 deletions(-)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 531769a1e45..2b082b4a789 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -36,6 +37,7 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
/**
@@ -238,6 +240,30 @@ public class FutureUtil {
return future;
}
+ /**
+ * @throws RejectedExecutionException if this task cannot be accepted for
execution
+ * @throws NullPointerException if one of params is null
+ */
+ public static <T> @Nonnull CompletableFuture<T>
composeAsync(Supplier<CompletableFuture<T>> futureSupplier,
+ Executor
executor) {
+ Objects.requireNonNull(futureSupplier);
+ Objects.requireNonNull(executor);
+ final CompletableFuture<T> future = new CompletableFuture<>();
+ try {
+ executor.execute(() -> futureSupplier.get().whenComplete((result,
error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ future.complete(result);
+ }));
+ } catch (RejectedExecutionException ex) {
+ future.completeExceptionally(ex);
+ }
+ return future;
+ }
+
+
/**
* Creates a low-overhead timeout exception which is performance optimized
to minimize allocations
* and cpu consumption. It sets the stacktrace of the exception to the
given source class and
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
index c1121b1309c..11ae62226e7 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
@@ -24,7 +24,6 @@ import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -32,6 +31,7 @@ import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.SafeRunnable;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataCacheConfig;
@@ -62,6 +62,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
private Optional<T> proposedValue;
private final ScheduledExecutorService executor;
+ private final FutureUtil.Sequencer<Void> sequencer;
private enum InternalState {
Init, ElectionInProgress, LeaderIsPresent, Closed
@@ -85,7 +86,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
this.internalState = InternalState.Init;
this.stateChangesListener = stateChangesListener;
this.executor = executor;
-
+ this.sequencer = FutureUtil.Sequencer.create();
store.registerListener(this::handlePathNotification);
store.registerSessionListener(this::handleSessionNotification);
updateCachedValueFuture =
executor.scheduleWithFixedDelay(SafeRunnable.safeRun(this::getLeaderValue),
@@ -277,18 +278,18 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
private void handleSessionNotification(SessionEvent event) {
// Ensure we're only processing one session event at a time.
- executor.execute(SafeRunnable.safeRun(() -> {
+ sequencer.sequential(() -> FutureUtil.composeAsync(() -> {
if (event == SessionEvent.SessionReestablished) {
log.info("Revalidating leadership for {}", path);
-
- try {
- LeaderElectionState les = elect().get();
- log.info("Resynced leadership for {} - State: {}", path,
les);
- } catch (ExecutionException | InterruptedException e) {
- log.warn("Failure when processing session event", e);
- }
+ return elect().thenAccept(leaderState -> {
+ log.info("Resynced leadership for {} - State: {}", path,
leaderState);
+ }).exceptionally(ex -> {
+ log.warn("Failure when processing session event", ex);
+ return null;
+ });
}
- }));
+ return CompletableFuture.completedFuture(null);
+ }, executor));
}
private void handlePathNotification(Notification notification) {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
index f5e7e0528bd..4da6b7998a0 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java
@@ -27,11 +27,9 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataSerde;
@@ -53,6 +51,7 @@ class LockManagerImpl<T> implements LockManager<T> {
private final MetadataStoreExtended store;
private final MetadataCache<T> cache;
private final MetadataSerde<T> serde;
+ private final FutureUtil.Sequencer<Void> sequencer;
private final ExecutorService executor;
private enum State {
@@ -72,6 +71,7 @@ class LockManagerImpl<T> implements LockManager<T> {
this.cache = store.getMetadataCache(serde);
this.serde = serde;
this.executor = executor;
+ this.sequencer = FutureUtil.Sequencer.create();
store.registerSessionListener(this::handleSessionEvent);
store.registerListener(this::handleDataNotification);
}
@@ -118,9 +118,8 @@ class LockManagerImpl<T> implements LockManager<T> {
private void handleSessionEvent(SessionEvent se) {
// We want to make sure we're processing one event at a time and that
we're done with one event before going
// for the next one.
- executor.execute(SafeRunnable.safeRun(() -> {
- List<CompletableFuture<Void>> futures = new ArrayList<>();
-
+ sequencer.sequential(() -> FutureUtil.composeAsync(() -> {
+ final List<CompletableFuture<Void>> futures = new ArrayList<>();
if (se == SessionEvent.SessionReestablished) {
log.info("Metadata store session has been re-established.
Revalidating all the existing locks.");
for (ResourceLockImpl<T> lock : locks.values()) {
@@ -133,13 +132,12 @@ class LockManagerImpl<T> implements LockManager<T> {
futures.add(lock.revalidateIfNeededAfterReconnection());
}
}
-
- try {
- FutureUtil.waitForAll(futures).get();
- } catch (ExecutionException | InterruptedException e) {
- log.warn("Failure when processing session event", e);
- }
- }));
+ return FutureUtil.waitForAll(futures)
+ .exceptionally(ex -> {
+ log.warn("Failure when processing session event", ex);
+ return null;
+ });
+ }, executor));
}
private void handleDataNotification(Notification n) {