lhotari commented on PR #25821:
URL: https://github.com/apache/pulsar/pull/25821#issuecomment-4499981571
These are findings from a local Claude Code review. Please check before
merging.
## [BUG] Recovery race: aborted txn discovered via leftover `/txn/op` record
is visible until `applyTerminalNow` finishes
For a txn whose `/txn/op` records survived the previous broker's crash and
whose header is ABORTED (or GC'd → defaulted to ABORTED),
`applyHeaderForRecovery` writes `entry.state = ABORTED` into `txns` but does
**not** add to `abortedTxns`. Only the eventual `applyTerminalNow` (enqueued on
`stateTail`) adds to the set. Meanwhile `recoveryFuture.complete(null)` fires
as soon as the per-txn header reads finish, before `stateTail` settles.
`isTxnAborted` consults only `abortedTxns` — not the `txns` map. So in the
window between recovery completing and the queued `applyTerminalNow` finishing,
`isTxnAborted` returns **false** for these recovered-aborted txns. A dispatcher
that's already attached, or a new EARLIEST subscription created in that window,
can read above the watermark (recovery clamps `maxReadPosition` only when there
are recovery-discovered **OPEN** txns) and expose data that should be filtered.
`restartAfterAbort_abortedTxnStillFiltered` doesn't exercise this path
because the test pre-populates the durable aborted record, so `abortedTxns` is
hydrated during `scanAbortedTxns` and the race never opens. The hole is the
"previous broker crashed mid-apply, durable aborted record never written, only
the `/txn/op` record remains" path.
Suggested fix: in `applyHeaderForRecovery`, when `state ==
TxnState.ABORTED`, immediately `abortedTxns.add(txnIdKey)` under the lock.
`applyTerminalNow`'s idempotent `else-if` branch already tolerates this. A
regression test for the "op record exists, header GC'd or ABORTED, no durable
aborted record" scenario would lock this in.
## [BUG] `persistAbortedRecord` uses current watermark as max-position for
recovery-discovered txns — comment claims this is conservative, but the math
goes the other way
The trim-driven pruning rule (per the index design in `TxnPaths`) is "prune
aborted records whose max-position < trim point." For an aborted record to
stick around as long as needed, its stored max position must be ≥ the *highest*
position of the txn's data in the segment.
For a recovery-discovered txn the TB doesn't know the real positions, so
`persistAbortedRecord` falls back to `watermark.ledgerId()/entryId()`. But the
watermark is the *lowest* still-resolved point in the segment — the txn's
actual data sits *above* the watermark (that's exactly why recovery pins
`maxReadPosition` at the watermark while these txns remain open). Storing `max
= watermark` means the trim-pruner will drop the aborted record as soon as the
ML trims past the watermark, while the txn's real data is still readable at
higher positions. Once the record is gone, a future restart hydrates
`abortedTxns` without this entry, and `isTxnAborted` returns false → the
aborted entries become visible.
The javadoc says "the worst case is the record sits around longer" — but
`max = watermark` is the *earliest* prunable position, not the latest.
Suggested fix: for the unknown-position case, use the current `maxReadPosition`
(the segment LAC at the time the txn resolves, since the watermark pin is just
being released) or `Long.MAX_VALUE` — both keep the record alive until the
segment is fully trimmed.
This is latent: trim-driven pruning is explicitly deferred per the PR
description, so the buggy max position has no consumer yet. Worth fixing in
this PR before the trim wiring lands — otherwise it's a hidden footgun for the
next author.
## [QUALITY] `wasRecoveryDiscovered` parameter is dead
`persistAbortedRecord(String txnIdKey, Position lastPos, boolean
wasRecoveryDiscovered)` — the third parameter is passed but never referenced in
the body. The `lastPos \!= null` check is what actually gates the fallback.
Either drop the parameter or make it the explicit gate.
## [QUALITY] Empty `else if` branch with the operative side-effect in the
condition
```java
} else if (newState == TxnState.ABORTED && abortedTxns.add(txnIdKey)) {
// Idempotent path — header re-confirms ABORTED and we hadn't recorded
it yet...
}
```
Using `Set.add()` for its side effect inside a boolean condition with an
empty body is misleading. Prefer:
```java
} else if (newState == TxnState.ABORTED) {
abortedTxns.add(txnIdKey);
}
```
## [QUALITY] `persistWatermarkIfAdvanced` doesn't recover from a stale
`watermarkVersion` on CAS failure
If `casSegmentWatermark` fails (version mismatch from a split-brain
scenario, or any transient error), the `.exceptionally` handler just logs and
`watermarkVersion` stays stale in memory. Subsequent retries through
`stateTail` CAS with the same stale version and keep failing. For the
single-owner segment model this shouldn't happen, but if it does the only
recovery is restarting the TB. Consider re-reading the watermark on CAS failure
to self-heal.
## [QUALITY] Two consecutive `synchronized (lock)` blocks in
`syncMaxReadPositionForNormalPublish`
The non-txn-publish path acquires the lock, updates `lastDispatchable`,
releases, then re-acquires only to compose onto `stateTail`. Could be one block.
## [QUALITY] Redundant ternary on `expectedVersion`
In `persistWatermarkIfAdvanced`:
```java
Optional<Long> expected = expectedVersion == -1L ? Optional.of(-1L) :
Optional.of(expectedVersion);
```
Both branches yield the same value — simplify to
`Optional.of(expectedVersion)`.
## [QUALITY] Test coverage gap: the recovery-rebuilds-OPEN scenario lost a
useful assertion
The deleted `recovery_rebuildsOpenTxnStateFromOpRecords` checked
`isTxnAborted` for an OPEN recovery-discovered txn (must return false) and
validated that op records on a *different* segment weren't mistakenly recovered
here. The new `recoveryDiscoveredOpenTxn_pinsAtWatermark` covers
`maxReadPosition` pinning but drops both of those assertions. Worth carrying
them forward.
## [INTENT MISMATCH] Soft invariant: "messages above the watermark never
reach `isTxnAborted`"
The comment in `isTxnAborted` says "The watermark itself caps the
dispatcher, so messages above it never reach this check." During normal
operation `maxReadPosition` is `previous(min(open firstPositions))` — it caps
at the *lowest* open txn's first write, so committed-but-still-pending-cleanup
data sits above. The watermark-only cap is only true while recovery-discovered
opens exist. The invariant is correct in spirit but the comment's framing is
loose. Fine as-is; just don't write code in a follow-up that relies on it being
literally true.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]