lhotari opened a new pull request, #26000:
URL: https://github.com/apache/pulsar/pull/26000

   Fixes #25999
   
   ### Motivation
   
   When the leader is lost — the elected broker shuts down, crashes, loses its 
metadata session, or hands off leadership — the leader's ephemeral node is 
deleted and the remaining participants re-elect. While that re-election is 
settling, there is currently no authoritative way to ask "who is the leader":
   
   - `LeaderElectionService.getCurrentLeader()` → 
`LeaderElection.getLeaderValueIfPresent()` returns `Optional.empty()` even 
though a new leader is being (or has been) elected: the cache entry backing it 
is invalidated by the `Deleted` notification and is only repopulated by the 
next loading read — in the worst case by the periodic refresh task, up to 5 
minutes later.
   - `LeaderElectionService.readCurrentLeader()` → 
`LeaderElection.getLeaderValue()` is just `cache.get(path)`, so it reflects 
whatever the cache/store has at that instant, not the settled election outcome.
   
   Several call sites make decisions on these reads and misbehave during a 
leadership handoff: `BrokersBase.getLeaderBroker` returns 404 although a leader 
exists, `NamespacesBase.validateLeaderBrokerAsync` fails with 412 instead of 
redirecting to the leader, `NamespaceService.searchForCandidateBroker` falls 
back to decentralized load-manager decisions on several brokers concurrently, 
and the extensible load manager's channel-owner resolution surfaces "There is 
no channel owner now" errors that trigger recovery churn.
   
   The root cause is that `LeaderElectionImpl` keeps the leader value in a 
`MetadataCache` whose lifecycle is disconnected from the election state 
machine. The cache is unnecessary: the leader can only change through an 
election cycle, so the state machine always knows the leader value at the 
moments it changes. The cache's historical second purpose — registering the 
metadata watch via a `get()` — is obsolete now that `ZKMetadataStore` uses a 
persistent recursive watch.
   
   ### Modifications
   
   **`pulsar-metadata` — keep the leader value in the election cycle and make 
`getLeaderValue()` authoritative:**
   
   - `LeaderElectionImpl` tracks the current leader in a `currentLeaderFuture` 
updated by the election cycle: completed when the election settles (`Leading` 
with the proposed value, `Following` with the observed existing value), reset 
to pending when the leader node is deleted. The `MetadataCache` and the 
periodic refresh task are removed.
   - `getLeaderValue()` / `readCurrentLeader()` is the authoritative read: it 
waits for an in-progress election to settle, bounded by the default metadata 
operation timeout (30s, failing with a `TimeoutException`). An instance that 
never participated in the election (pure observer, e.g. BookKeeper's 
`MetadataDrivers` helpers querying the auditor) reads the store directly.
   - `getLeaderValueIfPresent()` / `getCurrentLeader()` is an explicitly 
documented non-blocking snapshot, suitable only for best-effort uses.
   - A closed instance does not wait: a closed leader reports an empty leader 
(the extensible load manager's `handleNoChannelOwnerError` recovery keys off 
the "no channel owner" condition), and `elect()` after `close()` reopens the 
instance (the `LeaderElectionService` `close()`+`start()` pattern used to force 
a leadership change).
   
   **`pulsar-broker` — use the authoritative read where leader decisions are 
made:**
   
   - `BrokersBase.getLeaderBroker` and 
`NamespacesBase.validateLeaderBrokerAsync` use `readCurrentLeader()`.
   - `NamespaceService.searchForCandidateBroker` is converted to a fully 
asynchronous flow (`selectCandidateBroker` + `acquireOwnershipOrRedirect`) so 
it can use `readCurrentLeader()`; the blocking `get()` on the 
heartbeat/SLA-monitor check is removed, and the decentralized-fallback and 
`authoritativeRedirect` semantics are preserved.
   
   **Tests:**
   
   - New `LeaderElectionTest` cases: authoritative reads never observe an empty 
leader during a re-election (single-instance external delete, and a two-member 
leadership handoff converging to the new leader); a closed leader reports an 
empty leader; `elect()` after `close()` runs a new election; a pure observer 
reads the leader value from the store while its snapshot stays empty.
   - New `LeaderElectionImplTest` case: `getLeaderValue()` fails with a 
`TimeoutException` when the election never settles (via a `@VisibleForTesting` 
override of the completion timeout).
   - `LeaderElectionServiceTest`'s mock now stubs `readCurrentLeader()`; 
`AutoRecoveryMainTest` re-reads `getAuditor()` inside its await instead of 
capturing a possibly-null reference once (the promptly-resolving read exposed 
the existing race).
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change added tests and can be verified as follows:
   
   - New `LeaderElectionTest` and `LeaderElectionImplTest` cases described 
above cover the wait-for-election, never-empty-during-handoff, timeout, close, 
reopen, and observer semantics.
   - Existing coverage validated locally: `LeaderElectionTest`, 
`LeaderElectionImplTest`, `LeaderElectionServiceTest`, 
`MultiBrokerLeaderElectionTest`, `NamespaceServiceTest`, 
`ModularLoadManagerImplTest`, `AdminApiMultiBrokersTest`, `AdminTest`, 
`AutoRecoveryMainTest`, `AuditorRollingRestartTest`, 
`TransactionCoordinatorV5Test`, `ExtensibleLoadManagerImplTest`, and the full 
`pulsar-metadata` suite.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [x] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   The `LeaderElection.getLeaderValue()` contract in `pulsar-metadata` changes 
from "read the cached store value" to "wait for an in-progress election to 
settle (bounded by the default metadata operation timeout)"; 
`getLeaderValueIfPresent()` is now documented as a non-blocking snapshot. All 
in-tree callers participate in the election or are handled by the observer 
fallback, and `GET /brokers/leaderBroker` / leader redirects now succeed during 
a leadership handoff instead of failing with 404/412.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to