This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c1f2a2b2607 [fix][meta] Keep the leader value in the election cycle 
and make leader reads authoritative (#26000)
c1f2a2b2607 is described below

commit c1f2a2b260728f00f0428db38ea5c57339163828
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Jun 12 08:40:43 2026 +0300

    [fix][meta] Keep the leader value in the election cycle and make leader 
reads authoritative (#26000)
---
 .../pulsar/broker/admin/impl/BrokersBase.java      |   7 +-
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  43 ++---
 .../broker/loadbalance/LeaderElectionService.java  |  10 ++
 .../pulsar/broker/namespace/NamespaceService.java  | 137 +++++++++-------
 .../loadbalance/LeaderElectionServiceTest.java     |   4 +
 .../metadata/api/coordination/LeaderElection.java  |  23 ++-
 .../coordination/impl/LeaderElectionImpl.java      | 175 +++++++++++++++------
 .../replication/AutoRecoveryMainTest.java          |  10 +-
 .../apache/pulsar/metadata/LeaderElectionTest.java | 155 ++++++++++++++++++
 .../coordination/impl/LeaderElectionImplTest.java  |  28 ++++
 10 files changed, 454 insertions(+), 138 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index e728b3b9abd..12ab0760975 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -140,8 +140,11 @@ public class BrokersBase extends AdminResource {
     public void getLeaderBroker(@Suspended final AsyncResponse asyncResponse) {
         
validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(),
                 pulsar().getBrokerId(), BrokerOperation.GET_LEADER_BROKER)
-                .thenAccept(__ -> {
-                    LeaderBroker leaderBroker = 
pulsar().getLeaderElectionService().getCurrentLeader()
+                // The authoritative read: waits for an in-progress leader 
election to settle
+                // instead of returning 404 while a re-election is still in 
flight.
+                .thenCompose(__ -> 
pulsar().getLeaderElectionService().readCurrentLeader())
+                .thenAccept(leader -> {
+                    LeaderBroker leaderBroker = leader
                             .orElseThrow(() -> new 
RestException(Status.NOT_FOUND, "Couldn't find leader broker"));
                     BrokerInfo brokerInfo = BrokerInfo.builder()
                             .serviceUrl(leaderBroker.getServiceUrl())
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index baff684bd08..bff682e0d1b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -57,7 +57,6 @@ import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
-import org.apache.pulsar.broker.loadbalance.LeaderBroker;
 import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
 import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -1417,26 +1416,28 @@ public abstract class NamespacesBase extends 
AdminResource {
         if (this.isLeaderBroker()) {
             return CompletableFuture.completedFuture(null);
         }
-        Optional<LeaderBroker> currentLeaderOpt = 
pulsar().getLeaderElectionService().getCurrentLeader();
-        if (currentLeaderOpt.isEmpty()) {
-            String errorStr = "The current leader is empty.";
-            log.error(errorStr);
-            return FutureUtil.failedFuture(new 
RestException(Response.Status.PRECONDITION_FAILED, errorStr));
-        }
-        LeaderBroker leaderBroker = 
pulsar().getLeaderElectionService().getCurrentLeader().get();
-        String leaderBrokerId = leaderBroker.getBrokerId();
-        LookupOptions lookupOptions = LookupOptions.builder()
-                
.webServiceAdvertisedListenerName(getWebServiceListenerName()).build();
-        return pulsar().getNamespaceService()
-                .createLookupResult(leaderBrokerId, false, lookupOptions)
-                .thenCompose(lookupResult -> {
-                    URI redirectUri = 
lookupResult.toRedirectUri(uri.getRequestUri());
-                    log.debug()
-                            .attr("leaderBrokerId", leaderBrokerId)
-                            .attr("redirectUri", redirectUri).log("Redirecting 
the request call to leader broker");
-                    return FutureUtil.failedFuture(
-                            new 
WebApplicationException(Response.temporaryRedirect(redirectUri).build()));
-                });
+        // The authoritative read: waits for an in-progress leader election to 
settle instead of
+        // failing the request while a re-election is still in flight.
+        return 
pulsar().getLeaderElectionService().readCurrentLeader().thenCompose(currentLeaderOpt
 -> {
+            if (currentLeaderOpt.isEmpty()) {
+                String errorStr = "The current leader is empty.";
+                log.error(errorStr);
+                return FutureUtil.failedFuture(new 
RestException(Response.Status.PRECONDITION_FAILED, errorStr));
+            }
+            String leaderBrokerId = currentLeaderOpt.get().getBrokerId();
+            LookupOptions lookupOptions = LookupOptions.builder()
+                    
.webServiceAdvertisedListenerName(getWebServiceListenerName()).build();
+            return pulsar().getNamespaceService()
+                    .createLookupResult(leaderBrokerId, false, lookupOptions)
+                    .thenCompose(lookupResult -> {
+                        URI redirectUri = 
lookupResult.toRedirectUri(uri.getRequestUri());
+                        log.debug()
+                                .attr("leaderBrokerId", leaderBrokerId)
+                                .attr("redirectUri", 
redirectUri).log("Redirecting the request call to leader broker");
+                        return FutureUtil.failedFuture(
+                                new 
WebApplicationException(Response.temporaryRedirect(redirectUri).build()));
+                    });
+        });
     }
 
     public CompletableFuture<Void> setNamespaceBundleAffinityAsync(String 
bundleRange, String destinationBroker) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
index 2e53b54e98f..21f67bd6b36 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
@@ -56,10 +56,20 @@ public class LeaderElectionService implements AutoCloseable 
{
         leaderElection.close();
     }
 
+    /**
+     * Authoritative read of the current leader: if a leader election is in 
progress, the returned
+     * future completes once it settles (bounded by the default metadata 
operation timeout). Use
+     * this whenever a decision is made based on who the leader is.
+     */
     public CompletableFuture<Optional<LeaderBroker>> readCurrentLeader() {
         return leaderElection.getLeaderValue();
     }
 
+    /**
+     * Non-blocking snapshot of the current leader; empty while a re-election 
is settling even
+     * though a leader may technically exist. Only suitable for best-effort 
uses such as logging —
+     * decision-making callers must use {@link #readCurrentLeader()}.
+     */
     public Optional<LeaderBroker> getCurrentLeader() {
         return leaderElection.getLeaderValueIfPresent();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index fb7c3d68bf6..91632c1ee4c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -59,7 +59,6 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.loadbalance.LeaderBroker;
 import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.loadbalance.ResourceUnit;
@@ -561,7 +560,6 @@ public class NamespaceService implements AutoCloseable {
     private void searchForCandidateBroker(NamespaceBundle bundle,
                                           
CompletableFuture<Optional<LookupResult>> lookupFuture,
                                           LookupOptions options) {
-        String candidateBroker;
         LeaderElectionService les = pulsar.getLeaderElectionService();
         if (les == null) {
             log.warn()
@@ -572,71 +570,102 @@ public class NamespaceService implements AutoCloseable {
             return;
         }
 
-        boolean authoritativeRedirect = les.isLeader();
+        selectCandidateBroker(bundle, options, les)
+                .thenAcceptAsync(selection -> {
+                    if (selection.isEmpty()) {
+                        log.warn()
+                                .attr("namespaceBundle", bundle)
+                                .log("Load manager didn't return any available 
broker. Returning empty result to"
+                                        + " lookup. NamespaceBundle");
+                        lookupFuture.complete(Optional.empty());
+                        return;
+                    }
+                    acquireOwnershipOrRedirect(bundle, options, 
selection.get(), lookupFuture);
+                }, pulsar.getExecutor())
+                .exceptionally(e -> {
+                    log.warn()
+                            .attr("acquire", bundle)
+                            .exception(e)
+                            .log("Error when searching for candidate broker to 
acquire");
+                    
lookupFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
+                    return null;
+                });
+    }
 
-        try {
-            // check if this is Heartbeat or SLAMonitor namespace
-            candidateBroker = getHeartbeatOrSLAMonitorBrokerId(bundle, cb ->
-                    CompletableFuture.completedFuture(isBrokerActive(cb)))
-                    .get(config.getMetadataStoreOperationTimeoutSeconds(), 
SECONDS);
+    /** The broker selected for a bundle assignment, and whether the redirect 
to it is authoritative. */
+    private record CandidateBrokerSelection(String candidateBroker, boolean 
authoritativeRedirect) { }
 
-            if (candidateBroker == null) {
-                Optional<LeaderBroker> currentLeader = 
pulsar.getLeaderElectionService().getCurrentLeader();
+    private CompletableFuture<Optional<CandidateBrokerSelection>> 
selectCandidateBroker(
+            NamespaceBundle bundle, LookupOptions options, 
LeaderElectionService les) {
+        boolean authoritativeRedirect = les.isLeader();
 
-                if (options.isAuthoritative()) {
-                    // leader broker already assigned the current broker as 
owner
-                    candidateBroker = pulsar.getBrokerId();
-                } else {
+        // check if this is Heartbeat or SLAMonitor namespace
+        return getHeartbeatOrSLAMonitorBrokerId(bundle, cb ->
+                CompletableFuture.completedFuture(isBrokerActive(cb)))
+                .thenComposeAsync(heartbeatOrSlaBroker -> {
+                    if (heartbeatOrSlaBroker != null) {
+                        return completedSelection(heartbeatOrSlaBroker, 
authoritativeRedirect);
+                    }
+                    if (options.isAuthoritative()) {
+                        // leader broker already assigned the current broker 
as owner
+                        return completedSelection(pulsar.getBrokerId(), 
authoritativeRedirect);
+                    }
                     LoadManager loadManager = this.loadManager.get();
-                    boolean makeLoadManagerDecisionOnThisBroker = 
!loadManager.isCentralized() || les.isLeader();
-                    if (!makeLoadManagerDecisionOnThisBroker) {
-                        // If leader is not active, fallback to pick the least 
loaded from current broker loadmanager
+                    if (!loadManager.isCentralized() || les.isLeader()) {
+                        return selectLeastLoadedBroker(bundle);
+                    }
+                    // The load manager decision belongs to the leader: read 
the leader
+                    // authoritatively (waits for an in-progress election to 
settle) instead of
+                    // acting on a possibly-empty snapshot during a leadership 
handoff.
+                    return 
les.readCurrentLeader().thenComposeAsync(currentLeader -> {
                         boolean leaderBrokerActive = currentLeader.isPresent()
                                 && 
isBrokerActive(currentLeader.get().getBrokerId());
-                        if (!leaderBrokerActive) {
-                            makeLoadManagerDecisionOnThisBroker = true;
-                            if (currentLeader.isEmpty()) {
-                                log.warn()
-                                        .attr("namespaceBundle", bundle)
-                                        .log("Leader broker info unavailable."
-                                                + " Using decentralized load"
-                                                + " manager decisions");
-                            } else {
-                                log.warn()
-                                        .attr("broker", currentLeader.get())
-                                        .attr("namespaceBundle", bundle)
-                                        .log("The current leader broker isn't 
active. Handling load manager"
-                                                + " decisions in a 
decentralized way. NamespaceBundle");
-                            }
+                        if (leaderBrokerActive) {
+                            // forward to leader broker to make assignment
+                            return 
completedSelection(currentLeader.get().getBrokerId(), authoritativeRedirect);
                         }
-                    }
-                    if (makeLoadManagerDecisionOnThisBroker) {
-                        Optional<String> availableBroker = 
getLeastLoadedFromLoadManager(bundle);
-                        if (availableBroker.isEmpty()) {
+                        // If leader is not active, fallback to pick the least 
loaded from current broker loadmanager
+                        if (currentLeader.isEmpty()) {
                             log.warn()
                                     .attr("namespaceBundle", bundle)
-                                    .log("Load manager didn't return any 
available broker. Returning empty result to"
-                                            + " lookup. NamespaceBundle");
-                            lookupFuture.complete(Optional.empty());
-                            return;
+                                    .log("Leader broker info unavailable."
+                                            + " Using decentralized load"
+                                            + " manager decisions");
+                        } else {
+                            log.warn()
+                                    .attr("broker", currentLeader.get())
+                                    .attr("namespaceBundle", bundle)
+                                    .log("The current leader broker isn't 
active. Handling load manager"
+                                            + " decisions in a decentralized 
way. NamespaceBundle");
                         }
-                        candidateBroker = availableBroker.get();
-                        authoritativeRedirect = true;
-                    } else {
-                        // forward to leader broker to make assignment
-                        candidateBroker = currentLeader.get().getBrokerId();
-                    }
-                }
-            }
+                        return selectLeastLoadedBroker(bundle);
+                    }, pulsar.getExecutor());
+                }, pulsar.getExecutor());
+    }
+
+    private static CompletableFuture<Optional<CandidateBrokerSelection>> 
completedSelection(
+            String candidateBroker, boolean authoritativeRedirect) {
+        return CompletableFuture.completedFuture(
+                Optional.of(new CandidateBrokerSelection(candidateBroker, 
authoritativeRedirect)));
+    }
+
+    // The decentralized decision is authoritative: this broker picked the 
owner itself.
+    private CompletableFuture<Optional<CandidateBrokerSelection>> 
selectLeastLoadedBroker(NamespaceBundle bundle) {
+        Optional<String> availableBroker;
+        try {
+            availableBroker = getLeastLoadedFromLoadManager(bundle);
         } catch (Exception e) {
-            log.warn()
-                    .attr("acquire", bundle)
-                    .exception(e)
-                    .log("Error when searching for candidate broker to 
acquire");
-            lookupFuture.completeExceptionally(e);
-            return;
+            return CompletableFuture.failedFuture(e);
         }
+        return CompletableFuture.completedFuture(
+                availableBroker.map(broker -> new 
CandidateBrokerSelection(broker, true)));
+    }
 
+    private void acquireOwnershipOrRedirect(NamespaceBundle bundle, 
LookupOptions options,
+                                            CandidateBrokerSelection selection,
+                                            
CompletableFuture<Optional<LookupResult>> lookupFuture) {
+        final String candidateBroker = selection.candidateBroker();
+        final boolean authoritativeRedirect = 
selection.authoritativeRedirect();
         try {
             Objects.requireNonNull(candidateBroker);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
index 0bdeb4341d9..0bfc626398f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.loadbalance;
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
 import com.google.common.collect.Sets;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
@@ -116,6 +117,9 @@ public class LeaderElectionServiceTest {
                 leaderBrokerReference.get() != null);
         Mockito.when(leaderElectionService.getCurrentLeader())
                 .thenAnswer(invocation -> 
Optional.ofNullable(leaderBrokerReference.get()));
+        Mockito.when(leaderElectionService.readCurrentLeader())
+                .thenAnswer(invocation ->
+                        
CompletableFuture.completedFuture(Optional.ofNullable(leaderBrokerReference.get())));
         leaderElectionServiceReference.set(leaderElectionService);
 
         // broker, webService and leaderElectionService is started, but elect 
not ready;
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/LeaderElection.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/LeaderElection.java
index 016b7d061d0..4b34815c558 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/LeaderElection.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/LeaderElection.java
@@ -45,19 +45,30 @@ public interface LeaderElection<T> extends AutoCloseable {
     LeaderElectionState getState();
 
     /**
-     * Get the value set by the elected leader, or empty if there's currently 
no leader.
+     * Get the value set by the elected leader.
+     * <p>
+     * This is the authoritative read: if a leader election is currently in 
progress (e.g. the
+     * previous leader's node was just deleted and the participants are 
re-electing), the returned
+     * future completes once the election has settled, with the newly 
determined leader value. The
+     * future completes exceptionally with a {@link 
java.util.concurrent.TimeoutException} if the
+     * election does not complete within the default metadata operation 
timeout.
+     * <p>
+     * An instance that never participated in the election (no {@link 
#elect(Object)} call) reads
+     * the leader value directly from the metadata store. A closed instance 
does not wait: it
+     * reports an empty leader if it held the leadership when closed, or its 
last known view
+     * otherwise.
      *
      * @return a future that will track the completion of the operation
      */
     CompletableFuture<Optional<T>> getLeaderValue();
 
     /**
-     * Get the value set by the elected leader, or empty if there's currently 
no leader.
+     * Get a non-blocking snapshot of the value set by the elected leader, or 
empty if no leader is
+     * known right now.
      * <p>
-     * The call is non blocking and in certain cases can return 
<code>Optional.empty()</code> even though a leader is
-     * technically elected.
-     *
-     * @return a future that will track the completion of the operation
+     * The snapshot can return <code>Optional.empty()</code> even though a 
leader is technically
+     * elected (for example while a re-election is still settling). Callers 
that need the
+     * authoritative leader must use {@link #getLeaderValue()} instead.
      */
     Optional<T> getLeaderValueIfPresent();
 
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
index ce8167271f5..302d18b574c 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
@@ -20,20 +20,18 @@ package org.apache.pulsar.metadata.coordination.impl;
 
 import com.fasterxml.jackson.databind.type.TypeFactory;
 import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
 import java.util.EnumSet;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import lombok.CustomLog;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
-import org.apache.pulsar.metadata.api.MetadataCache;
-import org.apache.pulsar.metadata.api.MetadataCacheConfig;
 import org.apache.pulsar.metadata.api.MetadataSerde;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyClosedException;
@@ -52,14 +50,19 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
     private final String path;
     private final MetadataSerde<T> serde;
     private final MetadataStoreExtended store;
-    private final MetadataCache<T> cache;
     private final Consumer<LeaderElectionState> stateChangesListener;
-    private final ScheduledFuture<?> updateCachedValueFuture;
 
     private LeaderElectionState leaderElectionState;
     private Optional<Long> version = Optional.empty();
     private Optional<T> proposedValue;
 
+    // The leader value as known by the election cycle (the leader can only 
change through an
+    // election cycle). Pending while no leader is known — election in 
progress or the leader node
+    // deleted — and completed with the leader value once the election 
settles. Readers of
+    // getLeaderValue() wait on it (bounded by 
leaderElectionCompletionTimeoutSeconds);
+    // getLeaderValueIfPresent() takes a non-blocking snapshot of it.
+    private CompletableFuture<Optional<T>> currentLeaderFuture = new 
CompletableFuture<>();
+
     private final ScheduledExecutorService executor;
     private final FutureUtil.Sequencer<Void> sequencer;
 
@@ -71,16 +74,21 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
 
     private static final int LEADER_ELECTION_RETRY_DELAY_SECONDS = 5;
 
+    // Upper bound for getLeaderValue() waiting on an election that never 
settles, aligned with the
+    // default metadata-store operation timeout (the broker's 
metadataStoreOperationTimeoutSeconds).
+    private volatile int leaderElectionCompletionTimeoutSeconds = 30;
+
+    @VisibleForTesting
+    void setLeaderElectionCompletionTimeoutSeconds(int 
leaderElectionCompletionTimeoutSeconds) {
+        this.leaderElectionCompletionTimeoutSeconds = 
leaderElectionCompletionTimeoutSeconds;
+    }
+
     LeaderElectionImpl(MetadataStoreExtended store, Class<T> clazz, String 
path,
             Consumer<LeaderElectionState> stateChangesListener,
                        ScheduledExecutorService executor) {
         this.path = path;
         this.serde = new 
JSONMetadataSerdeSimpleType<>(TypeFactory.defaultInstance().constructSimpleType(clazz,
 null));
         this.store = store;
-        MetadataCacheConfig<?> metadataCacheConfig = 
MetadataCacheConfig.builder()
-                .expireAfterWriteMillis(-1L)
-                .build();
-        this.cache = store.getMetadataCache(clazz, metadataCacheConfig);
         this.leaderElectionState = LeaderElectionState.NoLeader;
         this.internalState = InternalState.Init;
         this.stateChangesListener = stateChangesListener;
@@ -88,13 +96,38 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
         this.sequencer = FutureUtil.Sequencer.create();
         store.registerListener(this::handlePathNotification);
         store.registerSessionListener(this::handleSessionNotification);
-        updateCachedValueFuture = 
executor.scheduleWithFixedDelay(this::getLeaderValue,
-                metadataCacheConfig.getRefreshAfterWriteMillis() / 2,
-                metadataCacheConfig.getRefreshAfterWriteMillis(), 
TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Record the leader value determined by the election cycle, waking up any 
getLeaderValue()
+     * callers waiting for the election to settle.
+     */
+    private synchronized void leaderKnown(Optional<T> leaderValue) {
+        if (currentLeaderFuture.isDone()) {
+            currentLeaderFuture = 
CompletableFuture.completedFuture(leaderValue);
+        } else {
+            currentLeaderFuture.complete(leaderValue);
+        }
+    }
+
+    /**
+     * Mark the leader as unknown (the leader node was deleted) so 
getLeaderValue() callers wait for
+     * the next election cycle to settle instead of observing a stale value.
+     */
+    private synchronized void leaderUnknown() {
+        if (currentLeaderFuture.isDone()) {
+            currentLeaderFuture = new CompletableFuture<>();
+        }
     }
 
     @Override
     public synchronized CompletableFuture<LeaderElectionState> elect(T 
proposedValue) {
+        if (internalState == InternalState.Closed) {
+            // Reopened after close() (e.g. the broker's LeaderElectionService 
is close()d and then
+            // start()ed again): reset so a fresh election cycle runs and 
readers wait for it.
+            leaderElectionState = LeaderElectionState.NoLeader;
+            currentLeaderFuture = new CompletableFuture<>();
+        }
         if (leaderElectionState != LeaderElectionState.NoLeader) {
             return CompletableFuture.completedFuture(leaderElectionState);
         }
@@ -112,12 +145,6 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
             } else {
                 return tryToBecomeLeader();
             }
-        }).thenCompose(leaderElectionState -> {
-            // make sure that the cache contains the current leader
-            // so that getLeaderValueIfPresent works on all brokers
-            cache.refresh(path);
-            return cache.get(path)
-                    .thenApply(__ -> leaderElectionState);
         });
     }
 
@@ -137,6 +164,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
                 log.info().attr("value", existingValue).attr("path", 
path).attr("stat", res.getStat())
                         .log("Keeping the existing value as it's from the same 
session");
                 // The value is still valid because it was created in the same 
session
+                leaderKnown(Optional.of(existingValue));
                 changeState(LeaderElectionState.Leading);
                 return 
CompletableFuture.completedFuture(LeaderElectionState.Leading);
             } else {
@@ -160,6 +188,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
         }
 
         // If the existing value is different, it means there's already 
another leader
+        leaderKnown(Optional.of(existingValue));
         changeState(LeaderElectionState.Following);
         return 
CompletableFuture.completedFuture(LeaderElectionState.Following);
     }
@@ -190,37 +219,19 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
                 .thenAccept(stat -> {
                     synchronized (LeaderElectionImpl.this) {
                         if (internalState == InternalState.ElectionInProgress) 
{
-                            // Do a get() in order to force a notification 
later, if the z-node disappears
-                            cache.get(path)
-                                    .thenRun(() -> {
-                                        synchronized (LeaderElectionImpl.this) 
{
-                                            log.info().attr("path", 
path).attr("value", value)
-                                                    .log("Acquired 
leadership");
-                                            internalState = 
InternalState.LeaderIsPresent;
-                                            if (leaderElectionState != 
LeaderElectionState.Leading) {
-                                                leaderElectionState = 
LeaderElectionState.Leading;
-                                                try {
-                                                    
stateChangesListener.accept(leaderElectionState);
-                                                } catch (Throwable t) {
-                                                    
log.warn().exception(t).log("Exception in state change listener");
-                                                }
-                                            }
-                                            
result.complete(leaderElectionState);
-                                        }
-                                    }).exceptionally(ex -> {
-                                        // We fail to do the get(), so clean 
up the leader election fail the whole
-                                        // operation
-                                        log.warn().attr("path", 
path).exception(ex)
-                                                .log("Failed to get the 
current state after acquiring leadership."
-                                                        + " Conditionally 
deleting current entry.");
-                                        store.delete(path, 
Optional.of(stat.getVersion()))
-                                                .thenRun(() -> 
result.completeExceptionally(ex))
-                                                .exceptionally(ex2 -> {
-                                                    
result.completeExceptionally(ex2);
-                                                    return null;
-                                                });
-                                        return null;
-                                    });
+                            log.info().attr("path", path).attr("value", value)
+                                    .log("Acquired leadership");
+                            internalState = InternalState.LeaderIsPresent;
+                            leaderKnown(Optional.of(value));
+                            if (leaderElectionState != 
LeaderElectionState.Leading) {
+                                leaderElectionState = 
LeaderElectionState.Leading;
+                                try {
+                                    
stateChangesListener.accept(leaderElectionState);
+                                } catch (Throwable t) {
+                                    log.warn().exception(t).log("Exception in 
state change listener");
+                                }
+                            }
+                            result.complete(leaderElectionState);
                         } else {
                             log.info().attr("path", path).attr("value", 
value).attr("stat", stat)
                                     .log("Leadership was lost. Conditionally 
deleting entry.");
@@ -259,7 +270,6 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
 
     @Override
     public void close() throws Exception {
-        updateCachedValueFuture.cancel(true);
         try {
             asyncClose().join();
         } catch (CompletionException e) {
@@ -274,6 +284,12 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
         }
 
         internalState = InternalState.Closed;
+        // A closed election reports "no leader" rather than waiting or 
failing: callers like the
+        // extensible load manager's handleNoChannelOwnerError() key off the 
resulting
+        // "no channel owner" condition to restart the election.
+        if (!currentLeaderFuture.isDone()) {
+            currentLeaderFuture.complete(Optional.empty());
+        }
 
         if (leaderElectionState != LeaderElectionState.Leading) {
             return CompletableFuture.completedFuture(null);
@@ -283,6 +299,9 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
                 .thenAccept(__ -> {
                             synchronized (LeaderElectionImpl.this) {
                                 leaderElectionState = 
LeaderElectionState.NoLeader;
+                                // The deleted leader node was ours and a 
closed instance no longer
+                                // observes elections; don't keep reporting 
ourselves as leader.
+                                currentLeaderFuture = 
CompletableFuture.completedFuture(Optional.empty());
                             }
                         }
                 );
@@ -295,12 +314,61 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
 
     @Override
     public CompletableFuture<Optional<T>> getLeaderValue() {
-        return cache.get(path);
+        CompletableFuture<Optional<T>> future;
+        synchronized (this) {
+            if (internalState == InternalState.Init) {
+                // This instance never participated in the election (a pure 
observer, e.g.
+                // BookKeeper's MetadataDrivers helpers querying the current 
auditor): there is no
+                // local election cycle to wait for, so the store content is 
the authoritative
+                // answer.
+                return readLeaderValueFromStore();
+            }
+            future = currentLeaderFuture;
+        }
+        if (future.isDone()) {
+            // Hand out a derived future so callers cannot complete the 
internal one.
+            return future.thenApply(value -> value);
+        }
+        int timeoutSeconds = leaderElectionCompletionTimeoutSeconds;
+        return FutureUtil.addTimeoutHandling(whenLeaderKnown(future),
+                Duration.ofSeconds(timeoutSeconds), executor,
+                () -> FutureUtil.createTimeoutException(
+                        "Leader election on path " + path + " did not complete 
within "
+                                + timeoutSeconds + " seconds",
+                        LeaderElectionImpl.class, "getLeaderValue()"));
+    }
+
+    private CompletableFuture<Optional<T>> readLeaderValueFromStore() {
+        return store.get(path).thenApply(optRes -> optRes.map(res -> {
+            try {
+                return serde.deserialize(path, res.getValue(), res.getStat());
+            } catch (Throwable t) {
+                throw new CompletionException(t);
+            }
+        }));
+    }
+
+    // Track the internal future without exposing it: completing/cancelling 
the returned future
+    // (e.g. by the timeout handling) must not complete the election's own 
future.
+    private CompletableFuture<Optional<T>> 
whenLeaderKnown(CompletableFuture<Optional<T>> future) {
+        CompletableFuture<Optional<T>> result = new CompletableFuture<>();
+        future.whenComplete((value, ex) -> {
+            if (ex != null) {
+                result.completeExceptionally(ex);
+            } else {
+                result.complete(value);
+            }
+        });
+        return result;
     }
 
     @Override
     public Optional<T> getLeaderValueIfPresent() {
-        return cache.getIfCached(path);
+        CompletableFuture<Optional<T>> future;
+        synchronized (this) {
+            future = currentLeaderFuture;
+        }
+        return future.isDone() && !future.isCompletedExceptionally() ? 
future.join() : Optional.empty();
     }
 
     private void handleSessionNotification(SessionEvent event) {
@@ -338,6 +406,9 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
                 }
 
                 leaderElectionState = LeaderElectionState.NoLeader;
+                // The leader is unknown until the re-election below settles; 
getLeaderValue()
+                // callers wait for it instead of observing the stale value.
+                leaderUnknown();
 
                 if (proposedValue.isPresent()) {
                     elect()
diff --git 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
index 146e4f0dd58..47d5e9f1f79 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
@@ -143,12 +143,16 @@ public class AutoRecoveryMainTest extends 
BookKeeperClusterTestCase {
         }
         BookieId currentAuditor = main1.auditorElector.getCurrentAuditor();
         assertNotNull(currentAuditor);
-        Auditor auditor1 = main1.auditorElector.getAuditor();
         assertEquals("Current Auditor should be AR1", currentAuditor, 
BookieImpl.getBookieId(confByIndex(0)));
+        // getCurrentAuditor() can resolve as soon as the election settles, 
before the elector
+        // thread has constructed the Auditor instance — re-read getAuditor() 
on every poll instead
+        // of capturing a possibly-null reference once.
         Awaitility.waitAtMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
-            assertNotNull(auditor1);
-            assertTrue("Auditor of AR1 should be running", 
auditor1.isRunning());
+            Auditor a1 = main1.auditorElector.getAuditor();
+            assertNotNull(a1);
+            assertTrue("Auditor of AR1 should be running", a1.isRunning());
         });
+        Auditor auditor1 = main1.auditorElector.getAuditor();
 
 
         /*
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
index 4b48f3c20b0..cb269537383 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
@@ -18,10 +18,13 @@
  */
 package org.apache.pulsar.metadata;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import java.util.EnumSet;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
@@ -35,6 +38,7 @@ import 
org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
+import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
 
 public class LeaderElectionTest extends BaseMetadataStoreTest {
@@ -284,4 +288,155 @@ public class LeaderElectionTest extends 
BaseMetadataStoreTest {
         assertEquals(le.getLeaderValue().join(), Optional.of("test-1"));
         assertEqualsAndRetry(() -> le.getLeaderValueIfPresent(), 
Optional.of("test-1"), Optional.empty());
     }
+
+    @Test(dataProvider = "impl", timeOut = 30000)
+    public void readsDoNotObserveEmptyLeaderDuringReElection(String provider, 
Supplier<String> urlSupplier)
+            throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = 
MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+        String path = newKey();
+
+        @Cleanup
+        CoordinationService cs = new CoordinationServiceImpl(store);
+
+        @Cleanup
+        LeaderElection<String> le = cs.getLeaderElection(String.class, path, 
__ -> {
+        });
+
+        assertEquals(le.elect("test-1").join(), LeaderElectionState.Leading);
+
+        // Externally delete the leader node: the instance re-elects itself. 
An authoritative read
+        // issued during the churn either returns the last settled value or 
waits for the
+        // re-election to settle — it never observes an empty leader.
+        store.delete(path, Optional.empty()).join();
+        assertEquals(le.getLeaderValue().join(), Optional.of("test-1"));
+
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(le.getState(), LeaderElectionState.Leading);
+            assertEquals(le.getLeaderValueIfPresent(), Optional.of("test-1"));
+        });
+    }
+
+    @Test(dataProvider = "zkImpls", timeOut = 30000)
+    public void followerReadsResolveToTheNewLeaderAfterHandoff(String 
provider, Supplier<String> urlSupplier)
+            throws Exception {
+        @Cleanup
+        MetadataStoreExtended store1 = 
MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+        @Cleanup
+        MetadataStoreExtended store2 = 
MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+
+        String path = newKey();
+
+        @Cleanup
+        CoordinationService cs1 = new CoordinationServiceImpl(store1);
+        @Cleanup
+        CoordinationService cs2 = new CoordinationServiceImpl(store2);
+
+        @Cleanup
+        LeaderElection<String> le1 = cs1.getLeaderElection(String.class, path, 
__ -> {
+        });
+        @Cleanup
+        LeaderElection<String> le2 = cs2.getLeaderElection(String.class, path, 
__ -> {
+        });
+
+        assertEquals(le1.elect("test-1").join(), LeaderElectionState.Leading);
+        assertEquals(le2.elect("test-2").join(), 
LeaderElectionState.Following);
+        assertEquals(le2.getLeaderValue().join(), Optional.of("test-1"));
+
+        // The leader hands off: le2 re-elects itself. Authoritative reads 
during the handoff
+        // return one of the settled leader values and converge to the new 
leader, but never
+        // observe an empty leader.
+        le1.close();
+        List<Optional<String>> observed = new CopyOnWriteArrayList<>();
+        Awaitility.await().untilAsserted(() -> {
+            Optional<String> leader = le2.getLeaderValue().join();
+            observed.add(leader);
+            assertEquals(leader, Optional.of("test-2"));
+        });
+        assertThat(observed)
+                .as("authoritative reads during the leadership handoff")
+                .doesNotContain(Optional.empty());
+    }
+
+    @Test(dataProvider = "impl", timeOut = 30000)
+    public void closedLeaderReportsEmptyLeader(String provider, 
Supplier<String> urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = 
MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+        String path = newKey();
+
+        @Cleanup
+        CoordinationService cs = new CoordinationServiceImpl(store);
+
+        LeaderElection<String> le = cs.getLeaderElection(String.class, path, 
__ -> {
+        });
+
+        assertEquals(le.elect("test-1").join(), LeaderElectionState.Leading);
+        assertEquals(le.getLeaderValue().join(), Optional.of("test-1"));
+
+        // Closing the leader releases the leadership; reads on the closed 
instance must report an
+        // empty leader without waiting (recovery paths key off the "no 
leader" condition).
+        le.close();
+        assertEquals(le.getLeaderValue().join(), Optional.empty());
+        assertEquals(le.getLeaderValueIfPresent(), Optional.empty());
+    }
+
+    @Test(dataProvider = "impl", timeOut = 30000)
+    public void electAfterCloseRunsANewElection(String provider, 
Supplier<String> urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = 
MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+        String path = newKey();
+
+        @Cleanup
+        CoordinationService cs = new CoordinationServiceImpl(store);
+
+        @Cleanup
+        LeaderElection<String> le = cs.getLeaderElection(String.class, path, 
__ -> {
+        });
+
+        assertEquals(le.elect("test-1").join(), LeaderElectionState.Leading);
+        le.close();
+
+        // Re-electing on a closed instance reopens it (the broker's 
LeaderElectionService is
+        // close()d and start()ed again to force a leadership change).
+        assertEquals(le.elect("test-1").join(), LeaderElectionState.Leading);
+        assertEquals(le.getLeaderValue().join(), Optional.of("test-1"));
+        assertEquals(le.getLeaderValueIfPresent(), Optional.of("test-1"));
+    }
+
+    @Test(dataProvider = "impl", timeOut = 30000)
+    public void observerReadsLeaderValueFromStore(String provider, 
Supplier<String> urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = 
MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+        String path = newKey();
+
+        @Cleanup
+        CoordinationService cs = new CoordinationServiceImpl(store);
+        @Cleanup
+        CoordinationService observerCs = new CoordinationServiceImpl(store);
+
+        @Cleanup
+        LeaderElection<String> le = cs.getLeaderElection(String.class, path, 
__ -> {
+        });
+        // The observer never calls elect(): there is no local election cycle 
to wait for, so the
+        // authoritative read goes directly to the metadata store, while the 
snapshot stays empty.
+        @Cleanup
+        LeaderElection<String> observer = 
observerCs.getLeaderElection(String.class, path, __ -> {
+        });
+
+        assertEquals(observer.getLeaderValue().join(), Optional.empty());
+
+        assertEquals(le.elect("test-1").join(), LeaderElectionState.Leading);
+        assertEquals(observer.getLeaderValue().join(), Optional.of("test-1"));
+        assertEquals(observer.getLeaderValueIfPresent(), Optional.empty());
+    }
 }
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImplTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImplTest.java
index 09c9d71c41a..4827c4a197a 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImplTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImplTest.java
@@ -18,7 +18,14 @@
  */
 package org.apache.pulsar.metadata.coordination.impl;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 import lombok.Cleanup;
 import org.apache.pulsar.metadata.BaseMetadataStoreTest;
@@ -62,4 +69,25 @@ public class LeaderElectionImplTest extends 
BaseMetadataStoreTest {
         });
         blockFuture.join();
     }
+
+    @Test(timeOut = 20000)
+    public void getLeaderValueTimesOutWhenElectionNeverCompletes() {
+        MetadataStoreExtended store = mock(MetadataStoreExtended.class);
+        // The store never answers, so the election never settles.
+        when(store.get(anyString())).thenReturn(new CompletableFuture<>());
+
+        @Cleanup("shutdownNow")
+        ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+        LeaderElectionImpl<String> le = new LeaderElectionImpl<>(store, 
String.class,
+                "/getLeaderValueTimesOutWhenElectionNeverCompletes", __ -> {
+        }, executor);
+        le.setLeaderElectionCompletionTimeoutSeconds(1);
+
+        le.elect("test-1");
+
+        assertThatThrownBy(() -> le.getLeaderValue().join())
+                .hasCauseInstanceOf(TimeoutException.class)
+                .cause()
+                .hasMessageContaining("did not complete within");
+    }
 }

Reply via email to