This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c1f2a2b2607 [fix][meta] Keep the leader value in the election cycle
and make leader reads authoritative (#26000)
c1f2a2b2607 is described below
commit c1f2a2b260728f00f0428db38ea5c57339163828
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Jun 12 08:40:43 2026 +0300
[fix][meta] Keep the leader value in the election cycle and make leader
reads authoritative (#26000)
---
.../pulsar/broker/admin/impl/BrokersBase.java | 7 +-
.../pulsar/broker/admin/impl/NamespacesBase.java | 43 ++---
.../broker/loadbalance/LeaderElectionService.java | 10 ++
.../pulsar/broker/namespace/NamespaceService.java | 137 +++++++++-------
.../loadbalance/LeaderElectionServiceTest.java | 4 +
.../metadata/api/coordination/LeaderElection.java | 23 ++-
.../coordination/impl/LeaderElectionImpl.java | 175 +++++++++++++++------
.../replication/AutoRecoveryMainTest.java | 10 +-
.../apache/pulsar/metadata/LeaderElectionTest.java | 155 ++++++++++++++++++
.../coordination/impl/LeaderElectionImplTest.java | 28 ++++
10 files changed, 454 insertions(+), 138 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index e728b3b9abd..12ab0760975 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -140,8 +140,11 @@ public class BrokersBase extends AdminResource {
public void getLeaderBroker(@Suspended final AsyncResponse asyncResponse) {
validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(),
pulsar().getBrokerId(), BrokerOperation.GET_LEADER_BROKER)
- .thenAccept(__ -> {
- LeaderBroker leaderBroker =
pulsar().getLeaderElectionService().getCurrentLeader()
+ // The authoritative read: waits for an in-progress leader
election to settle
+ // instead of returning 404 while a re-election is still in
flight.
+ .thenCompose(__ ->
pulsar().getLeaderElectionService().readCurrentLeader())
+ .thenAccept(leader -> {
+ LeaderBroker leaderBroker = leader
.orElseThrow(() -> new
RestException(Status.NOT_FOUND, "Couldn't find leader broker"));
BrokerInfo brokerInfo = BrokerInfo.builder()
.serviceUrl(leaderBroker.getServiceUrl())
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index baff684bd08..bff682e0d1b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -57,7 +57,6 @@ import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
-import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -1417,26 +1416,28 @@ public abstract class NamespacesBase extends
AdminResource {
if (this.isLeaderBroker()) {
return CompletableFuture.completedFuture(null);
}
- Optional<LeaderBroker> currentLeaderOpt =
pulsar().getLeaderElectionService().getCurrentLeader();
- if (currentLeaderOpt.isEmpty()) {
- String errorStr = "The current leader is empty.";
- log.error(errorStr);
- return FutureUtil.failedFuture(new
RestException(Response.Status.PRECONDITION_FAILED, errorStr));
- }
- LeaderBroker leaderBroker =
pulsar().getLeaderElectionService().getCurrentLeader().get();
- String leaderBrokerId = leaderBroker.getBrokerId();
- LookupOptions lookupOptions = LookupOptions.builder()
-
.webServiceAdvertisedListenerName(getWebServiceListenerName()).build();
- return pulsar().getNamespaceService()
- .createLookupResult(leaderBrokerId, false, lookupOptions)
- .thenCompose(lookupResult -> {
- URI redirectUri =
lookupResult.toRedirectUri(uri.getRequestUri());
- log.debug()
- .attr("leaderBrokerId", leaderBrokerId)
- .attr("redirectUri", redirectUri).log("Redirecting
the request call to leader broker");
- return FutureUtil.failedFuture(
- new
WebApplicationException(Response.temporaryRedirect(redirectUri).build()));
- });
+ // The authoritative read: waits for an in-progress leader election to
settle instead of
+ // failing the request while a re-election is still in flight.
+ return
pulsar().getLeaderElectionService().readCurrentLeader().thenCompose(currentLeaderOpt
-> {
+ if (currentLeaderOpt.isEmpty()) {
+ String errorStr = "The current leader is empty.";
+ log.error(errorStr);
+ return FutureUtil.failedFuture(new
RestException(Response.Status.PRECONDITION_FAILED, errorStr));
+ }
+ String leaderBrokerId = currentLeaderOpt.get().getBrokerId();
+ LookupOptions lookupOptions = LookupOptions.builder()
+
.webServiceAdvertisedListenerName(getWebServiceListenerName()).build();
+ return pulsar().getNamespaceService()
+ .createLookupResult(leaderBrokerId, false, lookupOptions)
+ .thenCompose(lookupResult -> {
+ URI redirectUri =
lookupResult.toRedirectUri(uri.getRequestUri());
+ log.debug()
+ .attr("leaderBrokerId", leaderBrokerId)
+ .attr("redirectUri",
redirectUri).log("Redirecting the request call to leader broker");
+ return FutureUtil.failedFuture(
+ new
WebApplicationException(Response.temporaryRedirect(redirectUri).build()));
+ });
+ });
}
public CompletableFuture<Void> setNamespaceBundleAffinityAsync(String
bundleRange, String destinationBroker) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
index 2e53b54e98f..21f67bd6b36 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
@@ -56,10 +56,20 @@ public class LeaderElectionService implements AutoCloseable
{
leaderElection.close();
}
+ /**
+ * Authoritative read of the current leader: if a leader election is in
progress, the returned
+ * future completes once it settles (bounded by the default metadata
operation timeout). Use
+ * this whenever a decision is made based on who the leader is.
+ */
public CompletableFuture<Optional<LeaderBroker>> readCurrentLeader() {
return leaderElection.getLeaderValue();
}
+ /**
+ * Non-blocking snapshot of the current leader; empty while a re-election
is settling even
+ * though a leader may technically exist. Only suitable for best-effort
uses such as logging —
+ * decision-making callers must use {@link #readCurrentLeader()}.
+ */
public Optional<LeaderBroker> getCurrentLeader() {
return leaderElection.getLeaderValueIfPresent();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index fb7c3d68bf6..91632c1ee4c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -59,7 +59,6 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
@@ -561,7 +560,6 @@ public class NamespaceService implements AutoCloseable {
private void searchForCandidateBroker(NamespaceBundle bundle,
CompletableFuture<Optional<LookupResult>> lookupFuture,
LookupOptions options) {
- String candidateBroker;
LeaderElectionService les = pulsar.getLeaderElectionService();
if (les == null) {
log.warn()
@@ -572,71 +570,102 @@ public class NamespaceService implements AutoCloseable {
return;
}
- boolean authoritativeRedirect = les.isLeader();
+ selectCandidateBroker(bundle, options, les)
+ .thenAcceptAsync(selection -> {
+ if (selection.isEmpty()) {
+ log.warn()
+ .attr("namespaceBundle", bundle)
+ .log("Load manager didn't return any available
broker. Returning empty result to"
+ + " lookup. NamespaceBundle");
+ lookupFuture.complete(Optional.empty());
+ return;
+ }
+ acquireOwnershipOrRedirect(bundle, options,
selection.get(), lookupFuture);
+ }, pulsar.getExecutor())
+ .exceptionally(e -> {
+ log.warn()
+ .attr("acquire", bundle)
+ .exception(e)
+ .log("Error when searching for candidate broker to
acquire");
+
lookupFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
+ return null;
+ });
+ }
- try {
- // check if this is Heartbeat or SLAMonitor namespace
- candidateBroker = getHeartbeatOrSLAMonitorBrokerId(bundle, cb ->
- CompletableFuture.completedFuture(isBrokerActive(cb)))
- .get(config.getMetadataStoreOperationTimeoutSeconds(),
SECONDS);
+ /** The broker selected for a bundle assignment, and whether the redirect
to it is authoritative. */
+ private record CandidateBrokerSelection(String candidateBroker, boolean
authoritativeRedirect) { }
- if (candidateBroker == null) {
- Optional<LeaderBroker> currentLeader =
pulsar.getLeaderElectionService().getCurrentLeader();
+ private CompletableFuture<Optional<CandidateBrokerSelection>>
selectCandidateBroker(
+ NamespaceBundle bundle, LookupOptions options,
LeaderElectionService les) {
+ boolean authoritativeRedirect = les.isLeader();
- if (options.isAuthoritative()) {
- // leader broker already assigned the current broker as
owner
- candidateBroker = pulsar.getBrokerId();
- } else {
+ // check if this is Heartbeat or SLAMonitor namespace
+ return getHeartbeatOrSLAMonitorBrokerId(bundle, cb ->
+ CompletableFuture.completedFuture(isBrokerActive(cb)))
+ .thenComposeAsync(heartbeatOrSlaBroker -> {
+ if (heartbeatOrSlaBroker != null) {
+ return completedSelection(heartbeatOrSlaBroker,
authoritativeRedirect);
+ }
+ if (options.isAuthoritative()) {
+ // leader broker already assigned the current broker
as owner
+ return completedSelection(pulsar.getBrokerId(),
authoritativeRedirect);
+ }
LoadManager loadManager = this.loadManager.get();
- boolean makeLoadManagerDecisionOnThisBroker =
!loadManager.isCentralized() || les.isLeader();
- if (!makeLoadManagerDecisionOnThisBroker) {
- // If leader is not active, fallback to pick the least
loaded from current broker loadmanager
+ if (!loadManager.isCentralized() || les.isLeader()) {
+ return selectLeastLoadedBroker(bundle);
+ }
+ // The load manager decision belongs to the leader: read
the leader
+ // authoritatively (waits for an in-progress election to
settle) instead of
+ // acting on a possibly-empty snapshot during a leadership
handoff.
+ return
les.readCurrentLeader().thenComposeAsync(currentLeader -> {
boolean leaderBrokerActive = currentLeader.isPresent()
&&
isBrokerActive(currentLeader.get().getBrokerId());
- if (!leaderBrokerActive) {
- makeLoadManagerDecisionOnThisBroker = true;
- if (currentLeader.isEmpty()) {
- log.warn()
- .attr("namespaceBundle", bundle)
- .log("Leader broker info unavailable."
- + " Using decentralized load"
- + " manager decisions");
- } else {
- log.warn()
- .attr("broker", currentLeader.get())
- .attr("namespaceBundle", bundle)
- .log("The current leader broker isn't
active. Handling load manager"
- + " decisions in a
decentralized way. NamespaceBundle");
- }
+ if (leaderBrokerActive) {
+ // forward to leader broker to make assignment
+ return
completedSelection(currentLeader.get().getBrokerId(), authoritativeRedirect);
}
- }
- if (makeLoadManagerDecisionOnThisBroker) {
- Optional<String> availableBroker =
getLeastLoadedFromLoadManager(bundle);
- if (availableBroker.isEmpty()) {
+ // If leader is not active, fallback to pick the least
loaded from current broker loadmanager
+ if (currentLeader.isEmpty()) {
log.warn()
.attr("namespaceBundle", bundle)
- .log("Load manager didn't return any
available broker. Returning empty result to"
- + " lookup. NamespaceBundle");
- lookupFuture.complete(Optional.empty());
- return;
+ .log("Leader broker info unavailable."
+ + " Using decentralized load"
+ + " manager decisions");
+ } else {
+ log.warn()
+ .attr("broker", currentLeader.get())
+ .attr("namespaceBundle", bundle)
+ .log("The current leader broker isn't
active. Handling load manager"
+ + " decisions in a decentralized
way. NamespaceBundle");
}
- candidateBroker = availableBroker.get();
- authoritativeRedirect = true;
- } else {
- // forward to leader broker to make assignment
- candidateBroker = currentLeader.get().getBrokerId();
- }
- }
- }
+ return selectLeastLoadedBroker(bundle);
+ }, pulsar.getExecutor());
+ }, pulsar.getExecutor());
+ }
+
+ private static CompletableFuture<Optional<CandidateBrokerSelection>>
completedSelection(
+ String candidateBroker, boolean authoritativeRedirect) {
+ return CompletableFuture.completedFuture(
+ Optional.of(new CandidateBrokerSelection(candidateBroker,
authoritativeRedirect)));
+ }
+
+ // The decentralized decision is authoritative: this broker picked the
owner itself.
+ private CompletableFuture<Optional<CandidateBrokerSelection>>
selectLeastLoadedBroker(NamespaceBundle bundle) {
+ Optional<String> availableBroker;
+ try {
+ availableBroker = getLeastLoadedFromLoadManager(bundle);
} catch (Exception e) {
- log.warn()
- .attr("acquire", bundle)
- .exception(e)
- .log("Error when searching for candidate broker to
acquire");
- lookupFuture.completeExceptionally(e);
- return;
+ return CompletableFuture.failedFuture(e);
}
+ return CompletableFuture.completedFuture(
+ availableBroker.map(broker -> new
CandidateBrokerSelection(broker, true)));
+ }
+ private void acquireOwnershipOrRedirect(NamespaceBundle bundle,
LookupOptions options,
+ CandidateBrokerSelection selection,
+
CompletableFuture<Optional<LookupResult>> lookupFuture) {
+ final String candidateBroker = selection.candidateBroker();
+ final boolean authoritativeRedirect =
selection.authoritativeRedirect();
try {
Objects.requireNonNull(candidateBroker);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
index 0bdeb4341d9..0bfc626398f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.loadbalance;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import com.google.common.collect.Sets;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@@ -116,6 +117,9 @@ public class LeaderElectionServiceTest {
leaderBrokerReference.get() != null);
Mockito.when(leaderElectionService.getCurrentLeader())
.thenAnswer(invocation ->
Optional.ofNullable(leaderBrokerReference.get()));
+ Mockito.when(leaderElectionService.readCurrentLeader())
+ .thenAnswer(invocation ->
+
CompletableFuture.completedFuture(Optional.ofNullable(leaderBrokerReference.get())));
leaderElectionServiceReference.set(leaderElectionService);
// broker, webService and leaderElectionService is started, but elect
not ready;
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/LeaderElection.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/LeaderElection.java
index 016b7d061d0..4b34815c558 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/LeaderElection.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/LeaderElection.java
@@ -45,19 +45,30 @@ public interface LeaderElection<T> extends AutoCloseable {
LeaderElectionState getState();
/**
- * Get the value set by the elected leader, or empty if there's currently
no leader.
+ * Get the value set by the elected leader.
+ * <p>
+ * This is the authoritative read: if a leader election is currently in
progress (e.g. the
+ * previous leader's node was just deleted and the participants are
re-electing), the returned
+ * future completes once the election has settled, with the newly
determined leader value. The
+ * future completes exceptionally with a {@link
java.util.concurrent.TimeoutException} if the
+ * election does not complete within the default metadata operation
timeout.
+ * <p>
+ * An instance that never participated in the election (no {@link
#elect(Object)} call) reads
+ * the leader value directly from the metadata store. A closed instance
does not wait: it
+ * reports an empty leader if it held the leadership when closed, or its
last known view
+ * otherwise.
*
* @return a future that will track the completion of the operation
*/
CompletableFuture<Optional<T>> getLeaderValue();
/**
- * Get the value set by the elected leader, or empty if there's currently
no leader.
+ * Get a non-blocking snapshot of the value set by the elected leader, or
empty if no leader is
+ * known right now.
* <p>
- * The call is non blocking and in certain cases can return
<code>Optional.empty()</code> even though a leader is
- * technically elected.
- *
- * @return a future that will track the completion of the operation
+ * The snapshot can return <code>Optional.empty()</code> even though a
leader is technically
+ * elected (for example while a re-election is still settling). Callers
that need the
+ * authoritative leader must use {@link #getLeaderValue()} instead.
*/
Optional<T> getLeaderValueIfPresent();
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
index ce8167271f5..302d18b574c 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
@@ -20,20 +20,18 @@ package org.apache.pulsar.metadata.coordination.impl;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.CustomLog;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
-import org.apache.pulsar.metadata.api.MetadataCache;
-import org.apache.pulsar.metadata.api.MetadataCacheConfig;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyClosedException;
@@ -52,14 +50,19 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
private final String path;
private final MetadataSerde<T> serde;
private final MetadataStoreExtended store;
- private final MetadataCache<T> cache;
private final Consumer<LeaderElectionState> stateChangesListener;
- private final ScheduledFuture<?> updateCachedValueFuture;
private LeaderElectionState leaderElectionState;
private Optional<Long> version = Optional.empty();
private Optional<T> proposedValue;
+ // The leader value as known by the election cycle (the leader can only
change through an
+ // election cycle). Pending while no leader is known — election in
progress or the leader node
+ // deleted — and completed with the leader value once the election
settles. Readers of
+ // getLeaderValue() wait on it (bounded by
leaderElectionCompletionTimeoutSeconds);
+ // getLeaderValueIfPresent() takes a non-blocking snapshot of it.
+ private CompletableFuture<Optional<T>> currentLeaderFuture = new
CompletableFuture<>();
+
private final ScheduledExecutorService executor;
private final FutureUtil.Sequencer<Void> sequencer;
@@ -71,16 +74,21 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
private static final int LEADER_ELECTION_RETRY_DELAY_SECONDS = 5;
+ // Upper bound for getLeaderValue() waiting on an election that never
settles, aligned with the
+ // default metadata-store operation timeout (the broker's
metadataStoreOperationTimeoutSeconds).
+ private volatile int leaderElectionCompletionTimeoutSeconds = 30;
+
+ @VisibleForTesting
+ void setLeaderElectionCompletionTimeoutSeconds(int
leaderElectionCompletionTimeoutSeconds) {
+ this.leaderElectionCompletionTimeoutSeconds =
leaderElectionCompletionTimeoutSeconds;
+ }
+
LeaderElectionImpl(MetadataStoreExtended store, Class<T> clazz, String
path,
Consumer<LeaderElectionState> stateChangesListener,
ScheduledExecutorService executor) {
this.path = path;
this.serde = new
JSONMetadataSerdeSimpleType<>(TypeFactory.defaultInstance().constructSimpleType(clazz,
null));
this.store = store;
- MetadataCacheConfig<?> metadataCacheConfig =
MetadataCacheConfig.builder()
- .expireAfterWriteMillis(-1L)
- .build();
- this.cache = store.getMetadataCache(clazz, metadataCacheConfig);
this.leaderElectionState = LeaderElectionState.NoLeader;
this.internalState = InternalState.Init;
this.stateChangesListener = stateChangesListener;
@@ -88,13 +96,38 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
this.sequencer = FutureUtil.Sequencer.create();
store.registerListener(this::handlePathNotification);
store.registerSessionListener(this::handleSessionNotification);
- updateCachedValueFuture =
executor.scheduleWithFixedDelay(this::getLeaderValue,
- metadataCacheConfig.getRefreshAfterWriteMillis() / 2,
- metadataCacheConfig.getRefreshAfterWriteMillis(),
TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Record the leader value determined by the election cycle, waking up any
getLeaderValue()
+ * callers waiting for the election to settle.
+ */
+ private synchronized void leaderKnown(Optional<T> leaderValue) {
+ if (currentLeaderFuture.isDone()) {
+ currentLeaderFuture =
CompletableFuture.completedFuture(leaderValue);
+ } else {
+ currentLeaderFuture.complete(leaderValue);
+ }
+ }
+
+ /**
+ * Mark the leader as unknown (the leader node was deleted) so
getLeaderValue() callers wait for
+ * the next election cycle to settle instead of observing a stale value.
+ */
+ private synchronized void leaderUnknown() {
+ if (currentLeaderFuture.isDone()) {
+ currentLeaderFuture = new CompletableFuture<>();
+ }
}
@Override
public synchronized CompletableFuture<LeaderElectionState> elect(T
proposedValue) {
+ if (internalState == InternalState.Closed) {
+ // Reopened after close() (e.g. the broker's LeaderElectionService
is close()d and then
+ // start()ed again): reset so a fresh election cycle runs and
readers wait for it.
+ leaderElectionState = LeaderElectionState.NoLeader;
+ currentLeaderFuture = new CompletableFuture<>();
+ }
if (leaderElectionState != LeaderElectionState.NoLeader) {
return CompletableFuture.completedFuture(leaderElectionState);
}
@@ -112,12 +145,6 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
} else {
return tryToBecomeLeader();
}
- }).thenCompose(leaderElectionState -> {
- // make sure that the cache contains the current leader
- // so that getLeaderValueIfPresent works on all brokers
- cache.refresh(path);
- return cache.get(path)
- .thenApply(__ -> leaderElectionState);
});
}
@@ -137,6 +164,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
log.info().attr("value", existingValue).attr("path",
path).attr("stat", res.getStat())
.log("Keeping the existing value as it's from the same
session");
// The value is still valid because it was created in the same
session
+ leaderKnown(Optional.of(existingValue));
changeState(LeaderElectionState.Leading);
return
CompletableFuture.completedFuture(LeaderElectionState.Leading);
} else {
@@ -160,6 +188,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
}
// If the existing value is different, it means there's already
another leader
+ leaderKnown(Optional.of(existingValue));
changeState(LeaderElectionState.Following);
return
CompletableFuture.completedFuture(LeaderElectionState.Following);
}
@@ -190,37 +219,19 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
.thenAccept(stat -> {
synchronized (LeaderElectionImpl.this) {
if (internalState == InternalState.ElectionInProgress)
{
- // Do a get() in order to force a notification
later, if the z-node disappears
- cache.get(path)
- .thenRun(() -> {
- synchronized (LeaderElectionImpl.this)
{
- log.info().attr("path",
path).attr("value", value)
- .log("Acquired
leadership");
- internalState =
InternalState.LeaderIsPresent;
- if (leaderElectionState !=
LeaderElectionState.Leading) {
- leaderElectionState =
LeaderElectionState.Leading;
- try {
-
stateChangesListener.accept(leaderElectionState);
- } catch (Throwable t) {
-
log.warn().exception(t).log("Exception in state change listener");
- }
- }
-
result.complete(leaderElectionState);
- }
- }).exceptionally(ex -> {
- // We fail to do the get(), so clean
up the leader election fail the whole
- // operation
- log.warn().attr("path",
path).exception(ex)
- .log("Failed to get the
current state after acquiring leadership."
- + " Conditionally
deleting current entry.");
- store.delete(path,
Optional.of(stat.getVersion()))
- .thenRun(() ->
result.completeExceptionally(ex))
- .exceptionally(ex2 -> {
-
result.completeExceptionally(ex2);
- return null;
- });
- return null;
- });
+ log.info().attr("path", path).attr("value", value)
+ .log("Acquired leadership");
+ internalState = InternalState.LeaderIsPresent;
+ leaderKnown(Optional.of(value));
+ if (leaderElectionState !=
LeaderElectionState.Leading) {
+ leaderElectionState =
LeaderElectionState.Leading;
+ try {
+
stateChangesListener.accept(leaderElectionState);
+ } catch (Throwable t) {
+ log.warn().exception(t).log("Exception in
state change listener");
+ }
+ }
+ result.complete(leaderElectionState);
} else {
log.info().attr("path", path).attr("value",
value).attr("stat", stat)
.log("Leadership was lost. Conditionally
deleting entry.");
@@ -259,7 +270,6 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
@Override
public void close() throws Exception {
- updateCachedValueFuture.cancel(true);
try {
asyncClose().join();
} catch (CompletionException e) {
@@ -274,6 +284,12 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
}
internalState = InternalState.Closed;
+ // A closed election reports "no leader" rather than waiting or
failing: callers like the
+ // extensible load manager's handleNoChannelOwnerError() key off the
resulting
+ // "no channel owner" condition to restart the election.
+ if (!currentLeaderFuture.isDone()) {
+ currentLeaderFuture.complete(Optional.empty());
+ }
if (leaderElectionState != LeaderElectionState.Leading) {
return CompletableFuture.completedFuture(null);
@@ -283,6 +299,9 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
.thenAccept(__ -> {
synchronized (LeaderElectionImpl.this) {
leaderElectionState =
LeaderElectionState.NoLeader;
+ // The deleted leader node was ours and a
closed instance no longer
+ // observes elections; don't keep reporting
ourselves as leader.
+ currentLeaderFuture =
CompletableFuture.completedFuture(Optional.empty());
}
}
);
@@ -295,12 +314,61 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
@Override
public CompletableFuture<Optional<T>> getLeaderValue() {
- return cache.get(path);
+ CompletableFuture<Optional<T>> future;
+ synchronized (this) {
+ if (internalState == InternalState.Init) {
+ // This instance never participated in the election (a pure
observer, e.g.
+ // BookKeeper's MetadataDrivers helpers querying the current
auditor): there is no
+ // local election cycle to wait for, so the store content is
the authoritative
+ // answer.
+ return readLeaderValueFromStore();
+ }
+ future = currentLeaderFuture;
+ }
+ if (future.isDone()) {
+ // Hand out a derived future so callers cannot complete the
internal one.
+ return future.thenApply(value -> value);
+ }
+ int timeoutSeconds = leaderElectionCompletionTimeoutSeconds;
+ return FutureUtil.addTimeoutHandling(whenLeaderKnown(future),
+ Duration.ofSeconds(timeoutSeconds), executor,
+ () -> FutureUtil.createTimeoutException(
+ "Leader election on path " + path + " did not complete
within "
+ + timeoutSeconds + " seconds",
+ LeaderElectionImpl.class, "getLeaderValue()"));
+ }
+
+ private CompletableFuture<Optional<T>> readLeaderValueFromStore() {
+ return store.get(path).thenApply(optRes -> optRes.map(res -> {
+ try {
+ return serde.deserialize(path, res.getValue(), res.getStat());
+ } catch (Throwable t) {
+ throw new CompletionException(t);
+ }
+ }));
+ }
+
+ // Track the internal future without exposing it: completing/cancelling
the returned future
+ // (e.g. by the timeout handling) must not complete the election's own
future.
+ private CompletableFuture<Optional<T>>
whenLeaderKnown(CompletableFuture<Optional<T>> future) {
+ CompletableFuture<Optional<T>> result = new CompletableFuture<>();
+ future.whenComplete((value, ex) -> {
+ if (ex != null) {
+ result.completeExceptionally(ex);
+ } else {
+ result.complete(value);
+ }
+ });
+ return result;
}
@Override
public Optional<T> getLeaderValueIfPresent() {
- return cache.getIfCached(path);
+ CompletableFuture<Optional<T>> future;
+ synchronized (this) {
+ future = currentLeaderFuture;
+ }
+ return future.isDone() && !future.isCompletedExceptionally() ?
future.join() : Optional.empty();
}
private void handleSessionNotification(SessionEvent event) {
@@ -338,6 +406,9 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
}
leaderElectionState = LeaderElectionState.NoLeader;
+ // The leader is unknown until the re-election below settles;
getLeaderValue()
+ // callers wait for it instead of observing the stale value.
+ leaderUnknown();
if (proposedValue.isPresent()) {
elect()
diff --git
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
index 146e4f0dd58..47d5e9f1f79 100644
---
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
@@ -143,12 +143,16 @@ public class AutoRecoveryMainTest extends
BookKeeperClusterTestCase {
}
BookieId currentAuditor = main1.auditorElector.getCurrentAuditor();
assertNotNull(currentAuditor);
- Auditor auditor1 = main1.auditorElector.getAuditor();
assertEquals("Current Auditor should be AR1", currentAuditor,
BookieImpl.getBookieId(confByIndex(0)));
+ // getCurrentAuditor() can resolve as soon as the election settles,
before the elector
+ // thread has constructed the Auditor instance — re-read getAuditor()
on every poll instead
+ // of capturing a possibly-null reference once.
Awaitility.waitAtMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
- assertNotNull(auditor1);
- assertTrue("Auditor of AR1 should be running",
auditor1.isRunning());
+ Auditor a1 = main1.auditorElector.getAuditor();
+ assertNotNull(a1);
+ assertTrue("Auditor of AR1 should be running", a1.isRunning());
});
+ Auditor auditor1 = main1.auditorElector.getAuditor();
/*
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
index 4b48f3c20b0..cb269537383 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
@@ -18,10 +18,13 @@
*/
package org.apache.pulsar.metadata;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import java.util.EnumSet;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@@ -35,6 +38,7 @@ import
org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
+import org.awaitility.Awaitility;
import org.testng.annotations.Test;
public class LeaderElectionTest extends BaseMetadataStoreTest {
@@ -284,4 +288,155 @@ public class LeaderElectionTest extends
BaseMetadataStoreTest {
assertEquals(le.getLeaderValue().join(), Optional.of("test-1"));
assertEqualsAndRetry(() -> le.getLeaderValueIfPresent(),
Optional.of("test-1"), Optional.empty());
}
+
+ @Test(dataProvider = "impl", timeOut = 30000)
+ public void readsDoNotObserveEmptyLeaderDuringReElection(String provider,
Supplier<String> urlSupplier)
+ throws Exception {
+ @Cleanup
+ MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ String path = newKey();
+
+ @Cleanup
+ CoordinationService cs = new CoordinationServiceImpl(store);
+
+ @Cleanup
+ LeaderElection<String> le = cs.getLeaderElection(String.class, path,
__ -> {
+ });
+
+ assertEquals(le.elect("test-1").join(), LeaderElectionState.Leading);
+
+ // Externally delete the leader node: the instance re-elects itself.
An authoritative read
+ // issued during the churn either returns the last settled value or
waits for the
+ // re-election to settle — it never observes an empty leader.
+ store.delete(path, Optional.empty()).join();
+ assertEquals(le.getLeaderValue().join(), Optional.of("test-1"));
+
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(le.getState(), LeaderElectionState.Leading);
+ assertEquals(le.getLeaderValueIfPresent(), Optional.of("test-1"));
+ });
+ }
+
+ @Test(dataProvider = "zkImpls", timeOut = 30000)
+ public void followerReadsResolveToTheNewLeaderAfterHandoff(String
provider, Supplier<String> urlSupplier)
+ throws Exception {
+ @Cleanup
+ MetadataStoreExtended store1 =
MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().build());
+ @Cleanup
+ MetadataStoreExtended store2 =
MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().build());
+
+ String path = newKey();
+
+ @Cleanup
+ CoordinationService cs1 = new CoordinationServiceImpl(store1);
+ @Cleanup
+ CoordinationService cs2 = new CoordinationServiceImpl(store2);
+
+ @Cleanup
+ LeaderElection<String> le1 = cs1.getLeaderElection(String.class, path,
__ -> {
+ });
+ @Cleanup
+ LeaderElection<String> le2 = cs2.getLeaderElection(String.class, path,
__ -> {
+ });
+
+ assertEquals(le1.elect("test-1").join(), LeaderElectionState.Leading);
+ assertEquals(le2.elect("test-2").join(),
LeaderElectionState.Following);
+ assertEquals(le2.getLeaderValue().join(), Optional.of("test-1"));
+
+ // The leader hands off: le2 re-elects itself. Authoritative reads
during the handoff
+ // return one of the settled leader values and converge to the new
leader, but never
+ // observe an empty leader.
+ le1.close();
+ List<Optional<String>> observed = new CopyOnWriteArrayList<>();
+ Awaitility.await().untilAsserted(() -> {
+ Optional<String> leader = le2.getLeaderValue().join();
+ observed.add(leader);
+ assertEquals(leader, Optional.of("test-2"));
+ });
+ assertThat(observed)
+ .as("authoritative reads during the leadership handoff")
+ .doesNotContain(Optional.empty());
+ }
+
+ @Test(dataProvider = "impl", timeOut = 30000)
+ public void closedLeaderReportsEmptyLeader(String provider,
Supplier<String> urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ String path = newKey();
+
+ @Cleanup
+ CoordinationService cs = new CoordinationServiceImpl(store);
+
+ LeaderElection<String> le = cs.getLeaderElection(String.class, path,
__ -> {
+ });
+
+ assertEquals(le.elect("test-1").join(), LeaderElectionState.Leading);
+ assertEquals(le.getLeaderValue().join(), Optional.of("test-1"));
+
+ // Closing the leader releases the leadership; reads on the closed
instance must report an
+ // empty leader without waiting (recovery paths key off the "no
leader" condition).
+ le.close();
+ assertEquals(le.getLeaderValue().join(), Optional.empty());
+ assertEquals(le.getLeaderValueIfPresent(), Optional.empty());
+ }
+
+ @Test(dataProvider = "impl", timeOut = 30000)
+ public void electAfterCloseRunsANewElection(String provider,
Supplier<String> urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ String path = newKey();
+
+ @Cleanup
+ CoordinationService cs = new CoordinationServiceImpl(store);
+
+ @Cleanup
+ LeaderElection<String> le = cs.getLeaderElection(String.class, path,
__ -> {
+ });
+
+ assertEquals(le.elect("test-1").join(), LeaderElectionState.Leading);
+ le.close();
+
+ // Re-electing on a closed instance reopens it (the broker's
LeaderElectionService is
+ // close()d and start()ed again to force a leadership change).
+ assertEquals(le.elect("test-1").join(), LeaderElectionState.Leading);
+ assertEquals(le.getLeaderValue().join(), Optional.of("test-1"));
+ assertEquals(le.getLeaderValueIfPresent(), Optional.of("test-1"));
+ }
+
+ @Test(dataProvider = "impl", timeOut = 30000)
+ public void observerReadsLeaderValueFromStore(String provider,
Supplier<String> urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ String path = newKey();
+
+ @Cleanup
+ CoordinationService cs = new CoordinationServiceImpl(store);
+ @Cleanup
+ CoordinationService observerCs = new CoordinationServiceImpl(store);
+
+ @Cleanup
+ LeaderElection<String> le = cs.getLeaderElection(String.class, path,
__ -> {
+ });
+ // The observer never calls elect(): there is no local election cycle
to wait for, so the
+ // authoritative read goes directly to the metadata store, while the
snapshot stays empty.
+ @Cleanup
+ LeaderElection<String> observer =
observerCs.getLeaderElection(String.class, path, __ -> {
+ });
+
+ assertEquals(observer.getLeaderValue().join(), Optional.empty());
+
+ assertEquals(le.elect("test-1").join(), LeaderElectionState.Leading);
+ assertEquals(observer.getLeaderValue().join(), Optional.of("test-1"));
+ assertEquals(observer.getLeaderValueIfPresent(), Optional.empty());
+ }
}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImplTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImplTest.java
index 09c9d71c41a..4827c4a197a 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImplTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImplTest.java
@@ -18,7 +18,14 @@
*/
package org.apache.pulsar.metadata.coordination.impl;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import lombok.Cleanup;
import org.apache.pulsar.metadata.BaseMetadataStoreTest;
@@ -62,4 +69,25 @@ public class LeaderElectionImplTest extends
BaseMetadataStoreTest {
});
blockFuture.join();
}
+
+ @Test(timeOut = 20000)
+ public void getLeaderValueTimesOutWhenElectionNeverCompletes() {
+ MetadataStoreExtended store = mock(MetadataStoreExtended.class);
+ // The store never answers, so the election never settles.
+ when(store.get(anyString())).thenReturn(new CompletableFuture<>());
+
+ @Cleanup("shutdownNow")
+ ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
+ LeaderElectionImpl<String> le = new LeaderElectionImpl<>(store,
String.class,
+ "/getLeaderValueTimesOutWhenElectionNeverCompletes", __ -> {
+ }, executor);
+ le.setLeaderElectionCompletionTimeoutSeconds(1);
+
+ le.elect("test-1");
+
+ assertThatThrownBy(() -> le.getLeaderValue().join())
+ .hasCauseInstanceOf(TimeoutException.class)
+ .cause()
+ .hasMessageContaining("did not complete within");
+ }
}