lhotari commented on PR #25929: URL: https://github.com/apache/pulsar/pull/25929#issuecomment-4625041554
These are the findings of a local Claude Code review. Please check whether these need to be addressed. ### 1. [BUG] Follower-broker snapshot push races the leader-value cache; no re-push → client stuck on a dead leader indefinitely `TransactionCoordinatorV5.buildAssignmentsSnapshot` + `onElectionStateChange` Trace for a 3+ broker cluster (A leads partition N; B and C follow; the client's watch is on C): - A dies → `Deleted` notification. `MetadataCacheImpl.accept` **invalidates** C's cache entry (`MetadataCacheImpl.java:378-379`). `LeaderElectionImpl.handlePathNotification` silently sets `NoLeader` and re-elects. - C loses to B → `changeState(Following)` fires C's listener **synchronously inside** `handleExistingLeaderValue` (`LeaderElectionImpl.java:163`) — *before* the `cache.refresh(path)` / `cache.get(path)` continuation in `elect()` (`LeaderElectionImpl.java:115-121`) repopulates the cache (that's a metadata-store RTT away). - The `ServerCnx` listener task (`ctx.executor().execute(...)`, sub-millisecond) builds the snapshot via `getLeaderValueIfPresent()` = `cache.getIfCached(path)` → **empty** → partition N is omitted from the pushed snapshot. Note that `MetadataCacheImpl.refresh()` is `computeIfPresent`-only (`MetadataCacheImpl.java:358-361`), so the `Created` notification for B's node does not repopulate an invalidated entry either. - There is **no subsequent trigger**: the cache repopulating fires no TC listener, the broker has no periodic/delayed re-push, and the client never re-polls. The client's handler for partition N keeps reconnect-looping against dead broker A "until the next snapshot" — which only arrives if some *other* leadership change happens later. The PR description's claim that the client parks a mid-election partition "until a later snapshot fills it in" is therefore not upheld — the later snapshot may never come. The 2-broker integration test structurally masks this (`PulsarClusterSpec.numBrokers = 2`): the survivor transitions *Following→Leading* for every moved partition, and in the Leading path `tryToBecomeLeader` populates the cache *before* firing the listener (`LeaderElectionImpl.java:194-208`), so its snapshots are always correct. Suggested fixes: build the snapshot from `getLeaderValue()` (async, loads on miss) instead of `getLeaderValueIfPresent()`, and/or debounce-re-push a short delay after any change, and/or re-push when a snapshot was incomplete. ### 2. [BUG] Two paths where the client watch dies silently with no reconnect `WatchTcAssignmentsDiscovery` - `attach()`: on a *re*connect (initial snapshot already received), if the newly-connected broker doesn't advertise `supports_tc_metadata_discovery` (e.g. per-broker config drift on `transactionCoordinatorScalableTopicsEnabled`), `initialSnapshotFuture.completeExceptionally(...)` is a no-op on the already-completed future and the method returns — no `scheduleReconnect()`. Discovery is permanently frozen on the last snapshot. - `onError()`: post-initial errors only log. `ClientCnx.handleCommandWatchTcAssignmentsUpdate` has already **removed the session** on error, so no `connectionClosed()` will ever fire for it either. A broker answering `NotAllowedError` / `ServiceNotReady` after a reconnect kills the watch forever. Both leave all handlers pinned to stale leaders with no recovery. Combined with finding 1, eventual consistency of the assignment map is not guaranteed. ### 3. [BUG] Initial watch open is one-shot — a transient failure hard-fails transaction client startup `TransactionCoordinatorClientImpl.selectDiscovery` + `WatchTcAssignmentsDiscovery.start` The probe (`getConnectionToServiceUrl()`) and the watch (`getAnyBrokerProxyConnection()`) can land on *different* brokers. If the watch broker rejects the open — e.g. `ServiceNotReady` while its TC is still initializing, or the feature flag is off there due to config drift — `attach()` / `onError()` fail `initialSnapshotFuture` → `startAsync()` fails outright. There is no retry for retryable errors, so transaction-enabled client startup can fail nondeterministically. Consider retrying the watch open (with the existing `reconnectBackoff`) for retryable failures instead of failing startup. ### 4. [QUALITY] `new URI(null)` NPE for a non-TLS client when a leader advertises only a TLS URL `TransactionMetaStoreHandler` ctor / `retargetLeader` `HandlerState.setRedirectedClusterURI` picks `serviceUrl` when `!isUseTls()` (`HandlerState.java:56-59`); `TcAssignment.broker_service_url` is optional and the snapshot delivers `null` for TLS-only brokers. `new URI(null)` throws **NPE**, not `URISyntaxException` — so the constructor's catch doesn't wrap it and `retargetLeader`'s catch doesn't swallow it. It propagates out of `handlers.compute` inside `onSnapshot` on the netty read thread → channel teardown → reconnect → repeat. A misconfigured client, but the failure mode should be a clear error, not an exception loop. ### 5. [QUALITY] `transactionCoordinatorScalableTopicsParallelism` has no validation or cross-broker consistency enforcement `ServiceConfiguration.java` No `minValue = 1` on the `@FieldContext` (other int configs set it; `0`/negative silently yields a coordinator that leads nothing and `nextHandler() == null`). More importantly, the doc says "Fixed at cluster bring-up," but nothing enforces agreement: brokers with different values run different election sets, and the snapshot's `parallelism` flip-flops depending on which broker serves the watch. Consider persisting the value in the metadata store on first start and failing/warning on mismatch. ### 6. [QUALITY] The integration test can't prove the new path is exercised `TcMetadataDiscoveryTest` If `selectDiscovery`'s probe transiently fails, it *silently* falls back to `AssignTopicTcDiscovery` — and both tests still pass, since the assign topic is initialized with the same partition count (`TC_PARALLELISM`). Consider asserting the selected discovery type (e.g. via the `"Transaction coordinator discovery selected"` log line or a test hook); otherwise a regression that breaks the watch path entirely could go green in CI. ### 7. [QUALITY] Minor notes - `AssignTopicTcDiscovery.handlers()` reads the `handlers` field twice (`handlers == null ? List.of() : List.of(handlers)`) — a concurrent `close()` between the two reads NPEs. Test-only surface, trivial to fix with a local variable. - On broker startup with 16 partitions, each elect completion fires every watcher listener → a connecting client may receive a burst of ~16 full snapshots. Bounded and harmless, but a debounce (which finding 1 wants anyway) would also fix this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
