lhotari commented on PR #25929:
URL: https://github.com/apache/pulsar/pull/25929#issuecomment-4625041554

   These are the findings of a local Claude Code review. Please check whether 
these need to be addressed.
   
   ### 1. [BUG] Follower-broker snapshot push races the leader-value cache; no 
re-push → client stuck on a dead leader indefinitely
   
   `TransactionCoordinatorV5.buildAssignmentsSnapshot` + `onElectionStateChange`
   
   Trace for a 3+ broker cluster (A leads partition N; B and C follow; the 
client's watch is on C):
   
   - A dies → `Deleted` notification. `MetadataCacheImpl.accept` 
**invalidates** C's cache entry (`MetadataCacheImpl.java:378-379`). 
`LeaderElectionImpl.handlePathNotification` silently sets `NoLeader` and 
re-elects.
   - C loses to B → `changeState(Following)` fires C's listener **synchronously 
inside** `handleExistingLeaderValue` (`LeaderElectionImpl.java:163`) — *before* 
the `cache.refresh(path)` / `cache.get(path)` continuation in `elect()` 
(`LeaderElectionImpl.java:115-121`) repopulates the cache (that's a 
metadata-store RTT away).
   - The `ServerCnx` listener task (`ctx.executor().execute(...)`, 
sub-millisecond) builds the snapshot via `getLeaderValueIfPresent()` = 
`cache.getIfCached(path)` → **empty** → partition N is omitted from the pushed 
snapshot. Note that `MetadataCacheImpl.refresh()` is `computeIfPresent`-only 
(`MetadataCacheImpl.java:358-361`), so the `Created` notification for B's node 
does not repopulate an invalidated entry either.
   - There is **no subsequent trigger**: the cache repopulating fires no TC 
listener, the broker has no periodic/delayed re-push, and the client never 
re-polls. The client's handler for partition N keeps reconnect-looping against 
dead broker A "until the next snapshot" — which only arrives if some *other* 
leadership change happens later.
   
   The PR description's claim that the client parks a mid-election partition 
"until a later snapshot fills it in" is therefore not upheld — the later 
snapshot may never come.
   
   The 2-broker integration test structurally masks this 
(`PulsarClusterSpec.numBrokers = 2`): the survivor transitions 
*Following→Leading* for every moved partition, and in the Leading path 
`tryToBecomeLeader` populates the cache *before* firing the listener 
(`LeaderElectionImpl.java:194-208`), so its snapshots are always correct.
   
   Suggested fixes: build the snapshot from `getLeaderValue()` (async, loads on 
miss) instead of `getLeaderValueIfPresent()`, and/or debounce-re-push a short 
delay after any change, and/or re-push when a snapshot was incomplete.
   
   ### 2. [BUG] Two paths where the client watch dies silently with no reconnect
   
   `WatchTcAssignmentsDiscovery`
   
   - `attach()`: on a *re*connect (initial snapshot already received), if the 
newly-connected broker doesn't advertise `supports_tc_metadata_discovery` (e.g. 
per-broker config drift on `transactionCoordinatorScalableTopicsEnabled`), 
`initialSnapshotFuture.completeExceptionally(...)` is a no-op on the 
already-completed future and the method returns — no `scheduleReconnect()`. 
Discovery is permanently frozen on the last snapshot.
   - `onError()`: post-initial errors only log. 
`ClientCnx.handleCommandWatchTcAssignmentsUpdate` has already **removed the 
session** on error, so no `connectionClosed()` will ever fire for it either. A 
broker answering `NotAllowedError` / `ServiceNotReady` after a reconnect kills 
the watch forever.
   
   Both leave all handlers pinned to stale leaders with no recovery. Combined 
with finding 1, eventual consistency of the assignment map is not guaranteed.
   
   ### 3. [BUG] Initial watch open is one-shot — a transient failure hard-fails 
transaction client startup
   
   `TransactionCoordinatorClientImpl.selectDiscovery` + 
`WatchTcAssignmentsDiscovery.start`
   
   The probe (`getConnectionToServiceUrl()`) and the watch 
(`getAnyBrokerProxyConnection()`) can land on *different* brokers. If the watch 
broker rejects the open — e.g. `ServiceNotReady` while its TC is still 
initializing, or the feature flag is off there due to config drift — `attach()` 
/ `onError()` fail `initialSnapshotFuture` → `startAsync()` fails outright. 
There is no retry for retryable errors, so transaction-enabled client startup 
can fail nondeterministically. Consider retrying the watch open (with the 
existing `reconnectBackoff`) for retryable failures instead of failing startup.
   
   ### 4. [QUALITY] `new URI(null)` NPE for a non-TLS client when a leader 
advertises only a TLS URL
   
   `TransactionMetaStoreHandler` ctor / `retargetLeader`
   
   `HandlerState.setRedirectedClusterURI` picks `serviceUrl` when `!isUseTls()` 
(`HandlerState.java:56-59`); `TcAssignment.broker_service_url` is optional and 
the snapshot delivers `null` for TLS-only brokers. `new URI(null)` throws 
**NPE**, not `URISyntaxException` — so the constructor's catch doesn't wrap it 
and `retargetLeader`'s catch doesn't swallow it. It propagates out of 
`handlers.compute` inside `onSnapshot` on the netty read thread → channel 
teardown → reconnect → repeat. A misconfigured client, but the failure mode 
should be a clear error, not an exception loop.
   
   ### 5. [QUALITY] `transactionCoordinatorScalableTopicsParallelism` has no 
validation or cross-broker consistency enforcement
   
   `ServiceConfiguration.java`
   
   No `minValue = 1` on the `@FieldContext` (other int configs set it; 
`0`/negative silently yields a coordinator that leads nothing and 
`nextHandler() == null`). More importantly, the doc says "Fixed at cluster 
bring-up," but nothing enforces agreement: brokers with different values run 
different election sets, and the snapshot's `parallelism` flip-flops depending 
on which broker serves the watch. Consider persisting the value in the metadata 
store on first start and failing/warning on mismatch.
   
   ### 6. [QUALITY] The integration test can't prove the new path is exercised
   
   `TcMetadataDiscoveryTest`
   
   If `selectDiscovery`'s probe transiently fails, it *silently* falls back to 
`AssignTopicTcDiscovery` — and both tests still pass, since the assign topic is 
initialized with the same partition count (`TC_PARALLELISM`). Consider 
asserting the selected discovery type (e.g. via the `"Transaction coordinator 
discovery selected"` log line or a test hook); otherwise a regression that 
breaks the watch path entirely could go green in CI.
   
   ### 7. [QUALITY] Minor notes
   
   - `AssignTopicTcDiscovery.handlers()` reads the `handlers` field twice 
(`handlers == null ? List.of() : List.of(handlers)`) — a concurrent `close()` 
between the two reads NPEs. Test-only surface, trivial to fix with a local 
variable.
   - On broker startup with 16 partitions, each elect completion fires every 
watcher listener → a connecting client may receive a burst of ~16 full 
snapshots. Bounded and harmless, but a debounce (which finding 1 wants anyway) 
would also fix this.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to