This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e4de065bb0 [improve][broker] PIP-473 P3.5: durable per-segment 
visibility state (#25821)
7e4de065bb0 is described below

commit 7e4de065bb04ac5727fd5db9c3a0f41a7b709aa3
Author: Matteo Merli <[email protected]>
AuthorDate: Thu May 21 16:28:51 2026 +0100

    [improve][broker] PIP-473 P3.5: durable per-segment visibility state 
(#25821)
---
 .../buffer/impl/MetadataTransactionBuffer.java     | 491 ++++++++++++++-------
 .../transaction/metadata/AbortedTxnRecord.java     |  35 ++
 .../transaction/metadata/SegmentWatermark.java     |  36 ++
 .../transaction/metadata/TxnMetadataStore.java     |  78 ++++
 .../broker/transaction/metadata/TxnPaths.java      | 117 ++++-
 .../buffer/impl/MetadataTransactionBufferTest.java | 222 ++++++++--
 6 files changed, 804 insertions(+), 175 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
index 46326b91afd..845c1c0328a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
@@ -41,6 +41,7 @@ import 
org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
 import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
+import org.apache.pulsar.broker.transaction.metadata.SegmentWatermark;
 import org.apache.pulsar.broker.transaction.metadata.TxnHeader;
 import org.apache.pulsar.broker.transaction.metadata.TxnIds;
 import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
@@ -58,35 +59,35 @@ import 
org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.ScanConsumer;
 
 /**
- * {@link TransactionBuffer} for {@code segment://} topics that reads truth 
from the metadata-store
- * transaction layout (PIP-473) rather than from a per-topic snapshot log.
+ * {@link TransactionBuffer} for {@code segment://} topics that reads truth 
from the
+ * metadata-store transaction layout (PIP-473).
  *
- * <p>Lifecycle:
- * <ul>
- *   <li><b>Publish</b> — {@link #appendBufferToTxn} reads the txn header 
(cache-first), appends the
- *       entry to the managed ledger, then appends a {@link TxnOp} record under
- *       {@code /txn/op/<txnId>-<seq>}. Both must succeed before we ack the 
publisher.</li>
- *   <li><b>State transitions</b> — driven by {@code 
/txn/segment-events/<segment>-*} sequence
- *       events. The events are wake-ups; the truth is the header. On each 
notification we
- *       re-read headers for every currently-open txn and apply the resulting 
state changes.</li>
- *   <li><b>Recovery</b> (Option C) — scan {@code idx:writes-by-segment} for 
this segment, group by
- *       {@code txnId}, fetch each header, and seed the in-memory cache. Then 
subscribe to the
- *       event stream for forward updates.</li>
- * </ul>
+ * <p><b>Publish-path ordering.</b> {@link #appendBufferToTxn} writes the 
{@code /txn/op} record
+ * <em>before</em> the managed-ledger append. This eliminates the orphan class 
— a crash between
+ * the two writes leaves either (a) no entry and no op record, or (b) an op 
record with no entry
+ * (TC times out → ABORTED → cleanup). There's never a ledger entry with no op 
record. The invariant
+ * <i>every transactional entry in the segment has a corresponding {@code 
/txn/op} record at the
+ * time of append</i> lets recovery scan {@code /txn/op} authoritatively 
without segment-replay.
  *
- * <p><b>TC ordering contract.</b> There is a TOCTOU window between the header 
authorization read
- * in {@link #appendBufferToTxn} and the managed-ledger append: the TC may 
flip the header (commit
- * or abort) in between, and the entry still lands. On commit that's harmless 
— the message is
- * visible. On abort, the subsequent segment-event delivery marks the txn 
ABORTED in the cache and
- * {@link #isTxnAborted} filters it. This relies on the TC publishing the 
segment event <em>after</em>
- * the header CAS lands so a participant that lost the race always learns the 
decision. The legacy
- * {@code TopicTransactionBuffer} has the same window with marker-message 
ordering.
+ * <p><b>Durable visibility state.</b> The TB persists a per-segment
+ * {@code /txn/segment-state/<segment>/watermark} record (the resolved-below 
mark) plus one
+ * {@code aborted/<txnId>} record per aborted txn with still-readable data. 
Together they let
+ * {@link #isTxnAborted} answer correctly for as long as the data is in the 
segment ML, even after
+ * the original {@code /txn/id/<txnId>} headers have been GC'd. The aborted 
records carry the txn's
+ * max position via a secondary index so the TB can range-delete them when the 
segment ML trims its
+ * older data.
+ *
+ * <p><b>Recovery.</b> Load the durable watermark + aborted set; scan {@code 
/txn/op} for this
+ * segment to discover any txns still open at the time of the previous 
shutdown (their first
+ * positions aren't known — the in-memory watermark stays pinned at the 
durable value until they
+ * resolve); subscribe to the segment-event stream.
  *
- * <p><b>In-memory growth.</b> Terminal txns stay in the {@code txns} cache 
for the segment's
- * lifetime so {@code isTxnAborted} can answer authoritatively for dispatcher 
reads — evicting a
- * COMMITTED entry would mean the default "unknown → aborted" filter wrongly 
hides its messages.
- * Long-running segments with high txn turnover will accumulate cached 
entries. Cache pruning tied
- * to data-ledger trimming / header GC is a P5/P6 concern.
+ * <p><b>TC ordering contract.</b> There is a TOCTOU window between the header 
authorization read
+ * and the {@code /txn/op} write: the TC may flip the header (commit or abort) 
in between. The op
+ * record still lands. On commit that's harmless. On abort, the subsequent 
segment-event delivery
+ * marks the txn ABORTED and the per-txn aborted record is written before the 
op record is deleted,
+ * so {@link #isTxnAborted} filters subsequent reads. This relies on the TC 
publishing the segment
+ * event <em>after</em> the header CAS.
  */
 @CustomLog
 public class MetadataTransactionBuffer implements TransactionBuffer {
@@ -101,15 +102,35 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
     private volatile AutoCloseable subscription;
     private volatile boolean closed;
 
-    /** Guards {@link #txns} + {@link #maxReadPosition} + {@link 
#lastDispatchable}. */
+    /** Guards mutable state below. */
     private final Object lock = new Object();
 
-    /** Cached per-txn state, populated by appendBufferToTxn and refreshed by 
event reconcile. */
+    /** In-memory per-txn state for txns this segment is involved in. */
     private final Map<String, TxnEntry> txns = new HashMap<>();
 
-    private Position maxReadPosition;
+    /** Aborted-txn set hydrated from durable state at recovery, updated on 
abort-apply. */
+    private final Set<String> abortedTxns = new HashSet<>();
+
+    /** Count of OPEN txns we discovered at recovery whose first position we 
don't know. */
+    private int recoveryDiscoveredOpen;
+
+    /** Durable watermark, mirrored in memory. May be null on a fresh segment. 
*/
+    private SegmentWatermark watermark;
+    /** Version of the durable watermark record; -1 if it doesn't exist yet. */
+    private long watermarkVersion = -1L;
+
+    /** Latest dispatched position from non-txn publishes — the natural 
ceiling when no opens pin. */
     private Position lastDispatchable;
 
+    /** Current maxReadPosition; never moves above the watermark while 
recovery-discovered opens exist. */
+    private Position maxReadPosition;
+
+    /**
+     * Serialised chain for watermark-persist + op-record-cleanup. Each apply 
enqueues itself on the
+     * tail so we never have two in-flight watermark CASes racing.
+     */
+    private CompletableFuture<Void> stateTail = 
CompletableFuture.completedFuture(null);
+
     private final LongAdder committedCount = new LongAdder();
     private final LongAdder abortedCount = new LongAdder();
 
@@ -136,46 +157,82 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
         }
         subscription = handle;
 
-        // Scan all /txn/op records for this segment, group by txnId.
-        Map<String, List<Position>> opsByTxn = new ConcurrentHashMap<>();
-        txnStore.listWritesBySegment(segmentName, new ScanConsumer() {
-            @Override
-            public void onNext(GetResult r) {
-                TxnOp op = TxnMetadataStore.fromJson(r.getValue(), 
TxnOp.class);
-                String txnIdKey = 
TxnPaths.txnIdFromOpPath(r.getStat().getPath());
-                if (txnIdKey == null) {
-                    return;
-                }
-                opsByTxn.computeIfAbsent(txnIdKey, k -> new ArrayList<>())
-                        .add(PositionFactory.create(op.getLedgerId(), 
op.getEntryId()));
-            }
+        // 1. Load durable watermark.
+        CompletableFuture<Void> watermarkLoad = 
txnStore.getSegmentWatermark(segmentName)
+                .thenAccept(opt -> {
+                    if (opt.isPresent()) {
+                        synchronized (lock) {
+                            watermark = opt.get().value();
+                            watermarkVersion = opt.get().version();
+                            // Initialise maxReadPosition at the watermark so 
we don't expose anything
+                            // above it until in-memory state catches up.
+                            maxReadPosition = PositionFactory.create(
+                                    watermark.ledgerId(), watermark.entryId());
+                        }
+                    }
+                });
 
-            @Override
-            public void onError(Throwable throwable) {
-                // Recovery still fails loudly via the scan's returned future 
and the terminal
-                // whenComplete below; logging here captures the cause with 
segment context.
-                log.warn().attr("segment", segmentName).exception(throwable)
-                        .log("TB recovery scan errored");
-            }
+        // 2. Load the aborted-txn set (scan by segment-scoped index range).
+        CompletableFuture<Void> abortedLoad = watermarkLoad.thenCompose(__ ->
+                txnStore.scanAbortedTxns(segmentName,
+                        
TxnPaths.abortedByPositionSegmentLowerBound(segmentName),
+                        
TxnPaths.abortedByPositionSegmentUpperBound(segmentName),
+                        new ScanConsumer() {
+                            @Override
+                            public void onNext(GetResult r) {
+                                String txnIdKey = 
TxnPaths.txnIdFromAbortedPath(r.getStat().getPath());
+                                if (txnIdKey != null) {
+                                    synchronized (lock) {
+                                        abortedTxns.add(txnIdKey);
+                                    }
+                                }
+                            }
+
+                            @Override
+                            public void onError(Throwable throwable) {
+                                log.warn().attr("segment", 
segmentName).exception(throwable)
+                                        .log("Aborted-txn scan errored during 
recovery");
+                            }
+
+                            @Override
+                            public void onCompleted() {
+                            }
+                        }));
+
+        // 3. Scan /txn/op for this segment to discover txns that were open at 
last shutdown.
+        Map<String, Boolean> writeOpsByTxn = new ConcurrentHashMap<>();
+        CompletableFuture<Void> opsLoad = abortedLoad.thenCompose(__ ->
+                txnStore.listWritesBySegment(segmentName, new ScanConsumer() {
+                    @Override
+                    public void onNext(GetResult r) {
+                        String txnIdKey = 
TxnPaths.txnIdFromOpPath(r.getStat().getPath());
+                        if (txnIdKey != null) {
+                            writeOpsByTxn.put(txnIdKey, Boolean.TRUE);
+                        }
+                    }
 
-            @Override
-            public void onCompleted() {
-            }
-        })
-        .thenCompose(__ -> {
-            // Fan out one header read per distinct txnId; build initial state.
+                    @Override
+                    public void onError(Throwable throwable) {
+                        log.warn().attr("segment", 
segmentName).exception(throwable)
+                                .log("Op-scan errored during recovery");
+                    }
+
+                    @Override
+                    public void onCompleted() {
+                    }
+                }));
+
+        // 4. For each discovered txn, fetch its header and seed the in-memory 
state.
+        opsLoad.thenCompose(__ -> {
             List<CompletableFuture<Void>> reads = new ArrayList<>();
-            opsByTxn.forEach((txnIdKey, positions) -> reads.add(
-                    txnStore.getHeader(txnIdKey).thenAccept(opt -> 
applyHeaderForRecovery(
-                            txnIdKey, opt, positions))));
+            writeOpsByTxn.keySet().forEach(txnIdKey -> reads.add(
+                    txnStore.getHeader(txnIdKey).thenAccept(opt ->
+                            applyHeaderForRecovery(txnIdKey, opt))));
             return FutureUtil.waitForAll(reads);
         })
         .whenComplete((__, err) -> {
             if (err != null) {
                 log.error().attr("segment", 
segmentName).exception(err).log("TB recovery failed");
-                // Close the subscription we opened above so the listener 
doesn't outlive a
-                // failed-to-recover TB instance (closeAsync may never be 
called if recovery never
-                // succeeded).
                 closeSubscriptionQuietly();
                 recoveryFuture.completeExceptionally(err);
                 return;
@@ -184,22 +241,33 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
                 recomputeMaxReadPositionLocked();
             }
             recoveryFuture.complete(null);
-            // Drain any events that fired between subscribe and now — 
triggerReconcile short-
-            // circuits while recoveryFuture is not done, so we explicitly 
kick a reconcile pass
-            // now to pick up state transitions whose only notification landed 
in that window.
+            // Drain any events that fired between subscribe and now.
             triggerReconcile();
         });
     }
 
-    private void applyHeaderForRecovery(String txnIdKey, 
Optional<Versioned<TxnHeader>> opt, List<Position> positions) {
+    private void applyHeaderForRecovery(String txnIdKey, 
Optional<Versioned<TxnHeader>> opt) {
         TxnState state = opt.map(v -> 
v.value().getState()).orElse(TxnState.ABORTED);
-        Position first = 
positions.stream().min(Position::compareTo).orElse(null);
         synchronized (lock) {
-            txns.put(txnIdKey, new TxnEntry(state, first));
+            TxnEntry entry = new TxnEntry(state);
+            if (state == TxnState.OPEN) {
+                entry.recoveryDiscovered = true;
+                recoveryDiscoveredOpen++;
+            } else if (state == TxnState.ABORTED) {
+                // Hydrate the aborted set now, under the lock, so 
isTxnAborted is correct the
+                // instant recoveryFuture completes. The terminal apply that 
would otherwise add
+                // this txn runs later on stateTail (after recovery 
completes), leaving a window
+                // in which the txn's data — which isn't watermark-pinned, 
since ABORTED entries
+                // don't set recoveryDiscovered — would read as visible. add() 
is idempotent with
+                // applyTerminalNow's aborted-set update.
+                abortedTxns.add(txnIdKey);
+            }
+            txns.put(txnIdKey, entry);
         }
-        // Schedule op-record cleanup for terminal txns (best-effort, async).
+        // Terminal txns with leftover /txn/op records still need their 
outcome materialised and
+        // the records cleaned up — enqueue an apply.
         if (state.isTerminal()) {
-            cleanupOpRecords(txnIdKey);
+            enqueueApplyTerminal(txnIdKey, state);
         }
     }
 
@@ -207,7 +275,6 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
 
     @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long 
sequenceId, ByteBuf buffer) {
-        // Retain so the buffer survives the chain — release in the terminal 
handlers.
         buffer.retain();
         return recoveryFuture.thenCompose(__ -> internalAppend(txnId, buffer))
                 .whenComplete((p, ex) -> buffer.release());
@@ -225,8 +292,16 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
                         return FutureUtil.failedFuture(new 
BrokerServiceException.NotAllowedException(
                                 "Transaction " + txnId + " is already " + 
state + " — TxnConflict"));
                     }
-                    return appendToLedger(buffer)
-                            .thenCompose(position -> recordOp(txnId, txnIdKey, 
position));
+                    // 1. Write /txn/op first. Positions are unknown at this 
point; the WRITE-kind
+                    // record uses sentinel 0/0 (positions live in the TB's 
in-memory tracking).
+                    TxnOp op = new TxnOp(TxnOpKind.WRITE, segmentName, null, 
0L, 0L, null);
+                    return txnStore.appendOp(txnIdKey, op).thenCompose(opStat 
->
+                            // 2. ML append. By the invariant above, /txn/op 
is durable before any
+                            // ledger entry exists for this op.
+                            appendToLedger(buffer).thenApply(position -> {
+                                trackPosition(txnIdKey, position);
+                                return position;
+                            }));
                 });
     }
 
@@ -240,7 +315,7 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
         return txnStore.getHeader(txnIdKey).thenApply(opt -> {
             TxnState state = opt.map(v -> 
v.value().getState()).orElse(TxnState.ABORTED);
             synchronized (lock) {
-                txns.putIfAbsent(txnIdKey, new TxnEntry(state, null));
+                txns.putIfAbsent(txnIdKey, new TxnEntry(state));
             }
             return state;
         });
@@ -262,36 +337,39 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
         return result;
     }
 
-    private CompletableFuture<Position> recordOp(TxnID txnId, String txnIdKey, 
Position position) {
-        TxnOp op = new TxnOp(TxnOpKind.WRITE, segmentName, null,
-                position.getLedgerId(), position.getEntryId(), null);
-        return txnStore.appendOp(txnIdKey, op).thenApply(stat -> {
-            synchronized (lock) {
-                TxnEntry entry = txns.get(txnIdKey);
-                // Only an OPEN entry pins maxReadPosition. If a concurrent 
reconcile flipped this
-                // txn to terminal between the cache-first authorization read 
and this update, do
-                // NOT resurrect it as OPEN — the ML append still lands but 
isTxnAborted will mask
-                // it for aborted txns (see class javadoc on the publish-side 
TOCTOU window).
-                if (entry != null && entry.state == TxnState.OPEN) {
-                    if (entry.firstPosition == null || 
position.compareTo(entry.firstPosition) < 0) {
-                        entry.firstPosition = position;
-                        recomputeMaxReadPositionLocked();
-                    }
-                }
+    private void trackPosition(String txnIdKey, Position position) {
+        synchronized (lock) {
+            TxnEntry entry = txns.get(txnIdKey);
+            // Only an OPEN, non-recovery entry tracks positions. 
Recovery-discovered entries pin
+            // at the watermark and shouldn't be re-keyed by later appends (we 
don't know the
+            // earliest position).
+            if (entry == null || entry.state != TxnState.OPEN || 
entry.recoveryDiscovered) {
+                return;
             }
-            return position;
-        });
+            if (entry.firstPosition == null || 
position.compareTo(entry.firstPosition) < 0) {
+                entry.firstPosition = position;
+            }
+            if (entry.lastPosition == null || 
position.compareTo(entry.lastPosition) > 0) {
+                entry.lastPosition = position;
+            }
+            recomputeMaxReadPositionLocked();
+        }
     }
 
     // ---- Reconcile (event-driven) -----------------------------------------
 
     private void triggerReconcile() {
-        if (closed || !recoveryFuture.isDone()) {
+        if (closed || !recoveryFuture.isDone() || 
recoveryFuture.isCompletedExceptionally()) {
             return;
         }
         Set<String> snapshot;
         synchronized (lock) {
-            snapshot = new HashSet<>(openTxnsLocked());
+            snapshot = new HashSet<>();
+            for (Map.Entry<String, TxnEntry> e : txns.entrySet()) {
+                if (e.getValue().state == TxnState.OPEN) {
+                    snapshot.add(e.getKey());
+                }
+            }
         }
         if (snapshot.isEmpty()) {
             return;
@@ -300,7 +378,9 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
         for (String txnIdKey : snapshot) {
             reads.add(txnStore.getHeader(txnIdKey).thenAccept(opt -> {
                 TxnState newState = opt.map(v -> 
v.value().getState()).orElse(TxnState.ABORTED);
-                applyReconciledState(txnIdKey, newState);
+                if (newState.isTerminal()) {
+                    enqueueApplyTerminal(txnIdKey, newState);
+                }
             }));
         }
         FutureUtil.waitForAll(reads).whenComplete((__, err) -> {
@@ -310,75 +390,182 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
         });
     }
 
-    private void applyReconciledState(String txnIdKey, TxnState newState) {
-        boolean cleanup = false;
+    /**
+     * Enqueue the durable side-effects for a txn that has been observed 
terminal: write the
+     * aborted record (if any), advance the persisted watermark, delete the 
{@code /txn/op}
+     * records. Serialised through {@link #stateTail} so concurrent applies 
don't race the
+     * watermark CAS.
+     */
+    private void enqueueApplyTerminal(String txnIdKey, TxnState newState) {
+        synchronized (lock) {
+            stateTail = stateTail.thenCompose(__ -> applyTerminalNow(txnIdKey, 
newState))
+                    .exceptionally(err -> {
+                        log.warn().attr("segment", segmentName).attr("txnId", 
txnIdKey)
+                                .exception(err).log("Apply-terminal failed; 
will retry on next reconcile");
+                        return null;
+                    });
+        }
+    }
+
+    private CompletableFuture<Void> applyTerminalNow(String txnIdKey, TxnState 
newState) {
+        // Snapshot the entry under the lock — we need its positions / 
recovery flag to decide
+        // what to persist.
+        boolean alreadyTerminal;
+        Position lastPos;
         synchronized (lock) {
             TxnEntry entry = txns.get(txnIdKey);
-            if (entry == null || entry.state == newState) {
-                return;
+            if (entry == null) {
+                return CompletableFuture.completedFuture(null);
             }
-            entry.state = newState;
-            if (newState.isTerminal()) {
-                entry.firstPosition = null;
-                cleanup = true;
+            alreadyTerminal = entry.state.isTerminal();
+            lastPos = entry.lastPosition;
+            // Mark in-memory now so subsequent appendBufferToTxn for this txn 
fail with TxnConflict.
+            if (!alreadyTerminal) {
+                entry.state = newState;
+                if (entry.recoveryDiscovered) {
+                    recoveryDiscoveredOpen = Math.max(0, 
recoveryDiscoveredOpen - 1);
+                    entry.recoveryDiscovered = false;
+                }
                 if (newState == TxnState.COMMITTED) {
                     committedCount.increment();
-                } else {
+                } else if (newState == TxnState.ABORTED) {
                     abortedCount.increment();
+                    abortedTxns.add(txnIdKey);
                 }
                 recomputeMaxReadPositionLocked();
+            } else if (newState == TxnState.ABORTED) {
+                // Idempotent path — header re-confirms ABORTED. Make sure the 
in-memory set holds
+                // it (e.g. after an in-memory rebuild that lost the set).
+                abortedTxns.add(txnIdKey);
             }
         }
-        if (cleanup) {
-            cleanupOpRecords(txnIdKey);
-        }
+
+        // Persist aborted record if this is an abort.
+        CompletableFuture<Void> persistAborted = (newState == TxnState.ABORTED)
+                ? persistAbortedRecord(txnIdKey, lastPos)
+                : CompletableFuture.completedFuture(null);
+
+        return persistAborted
+                .thenCompose(__ -> persistWatermarkIfAdvanced())
+                .thenCompose(__ -> 
txnStore.deleteWriteOpsForSegmentAndTxn(segmentName, txnIdKey));
     }
 
     /**
-     * Delete every {@code /txn/op} record for {@code (this segment, 
txnIdKey)}. Best-effort —
-     * failures are logged and retried by the next reconcile.
+     * Write {@code /txn/segment-state/<segment>/aborted/<txnId>} with the 
txn's max position in
+     * this segment. The stored position is the prune key: trim-driven pruning 
drops an aborted
+     * record once the segment trims past it, so it must be at least as high 
as the txn's highest
+     * data position, or the record would be dropped while its data is still 
readable.
+     *
+     * <p>When the positions are unknown (a recovery-discovered txn with no 
new appends) we fall
+     * back to the current segment LAC. The txn's data was written in a prior 
epoch, so it cannot
+     * sit above the LAC — that makes the LAC a correct conservative upper 
bound. The durable
+     * watermark would be wrong here: the txn's data sits <em>above</em> the 
watermark, so pruning
+     * keyed on the watermark would discard the record too early.
      */
-    private void cleanupOpRecords(String txnIdKey) {
-        txnStore.deleteWriteOpsForSegmentAndTxn(segmentName, txnIdKey)
-                .exceptionally(err -> {
-                    log.warn().attr("segment", segmentName).attr("txnId", 
txnIdKey).exception(err)
-                            .log("Op-record cleanup failed; will retry on next 
reconcile");
-                    return null;
-                });
+    private CompletableFuture<Void> persistAbortedRecord(String txnIdKey, 
Position lastPos) {
+        long maxLedger;
+        long maxEntry;
+        if (lastPos != null) {
+            maxLedger = lastPos.getLedgerId();
+            maxEntry = lastPos.getEntryId();
+        } else {
+            Position lac = ledger.getLastConfirmedEntry();
+            maxLedger = lac == null ? 0L : lac.getLedgerId();
+            maxEntry = lac == null ? 0L : lac.getEntryId();
+        }
+        return txnStore.putAbortedTxn(segmentName, txnIdKey, maxLedger, 
maxEntry).thenApply(s -> null);
     }
 
-    // ---- maxReadPosition ---------------------------------------------------
-
-    private Set<String> openTxnsLocked() {
-        Set<String> open = new HashSet<>();
-        for (Map.Entry<String, TxnEntry> e : txns.entrySet()) {
-            if (e.getValue().state == TxnState.OPEN) {
-                open.add(e.getKey());
+    /**
+     * If the in-memory watermark position has advanced beyond the durable 
one, CAS-write the new
+     * value. Idempotent and self-skipping when there's nothing to do.
+     */
+    private CompletableFuture<Void> persistWatermarkIfAdvanced() {
+        SegmentWatermark toWrite;
+        long expectedVersion;
+        synchronized (lock) {
+            Position desired = maxReadPosition;
+            if (desired == null) {
+                return CompletableFuture.completedFuture(null);
+            }
+            if (watermark != null
+                    && watermark.ledgerId() == desired.getLedgerId()
+                    && watermark.entryId() == desired.getEntryId()) {
+                return CompletableFuture.completedFuture(null);
             }
+            // Only advance forward.
+            if (watermark != null) {
+                Position existing = 
PositionFactory.create(watermark.ledgerId(), watermark.entryId());
+                if (desired.compareTo(existing) <= 0) {
+                    return CompletableFuture.completedFuture(null);
+                }
+            }
+            toWrite = new SegmentWatermark(desired.getLedgerId(), 
desired.getEntryId());
+            expectedVersion = watermarkVersion;
         }
-        return open;
+        Optional<Long> expected = Optional.of(expectedVersion);
+        return txnStore.casSegmentWatermark(segmentName, toWrite, expected)
+                .thenAccept(stat -> {
+                    synchronized (lock) {
+                        watermark = toWrite;
+                        watermarkVersion = stat.getVersion();
+                    }
+                })
+                .exceptionallyCompose(ex -> {
+                    Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                    if (cause instanceof 
MetadataStoreException.BadVersionException) {
+                        // Our in-memory version is stale (a concurrent writer 
moved the record).
+                        // Re-read so the next enqueued apply CASes against 
the current version
+                        // instead of looping on the stale one. Still 
propagate this failure so the
+                        // caller logs+retries; the retry now has a fresh 
version to work with.
+                        return 
txnStore.getSegmentWatermark(segmentName).thenAccept(opt -> {
+                            if (opt.isPresent()) {
+                                synchronized (lock) {
+                                    watermark = opt.get().value();
+                                    watermarkVersion = opt.get().version();
+                                }
+                            }
+                        }).thenCompose(__ -> FutureUtil.failedFuture(cause));
+                    }
+                    return FutureUtil.failedFuture(cause);
+                });
     }
 
+    // ---- maxReadPosition ---------------------------------------------------
+
     private void recomputeMaxReadPositionLocked() {
-        Position min = null;
-        for (TxnEntry e : txns.values()) {
-            if (e.state == TxnState.OPEN && e.firstPosition != null) {
-                if (min == null || e.firstPosition.compareTo(min) < 0) {
-                    min = e.firstPosition;
+        Position next;
+        Position watermarkPos = (watermark == null) ? null
+                : PositionFactory.create(watermark.ledgerId(), 
watermark.entryId());
+
+        if (recoveryDiscoveredOpen > 0) {
+            // Pinned at the durable watermark while any recovery-discovered 
open txn remains:
+            // we don't know their first positions and mustn't advance past 
them.
+            next = watermarkPos != null ? watermarkPos : maxReadPosition;
+        } else {
+            Position min = null;
+            for (TxnEntry e : txns.values()) {
+                if (e.state == TxnState.OPEN && e.firstPosition != null) {
+                    if (min == null || e.firstPosition.compareTo(min) < 0) {
+                        min = e.firstPosition;
+                    }
                 }
             }
+            if (min != null) {
+                next = ledger.getPreviousPosition(min);
+            } else {
+                // No open txns pinning anything: free to advance to 
last-dispatched.
+                next = lastDispatchable;
+            }
         }
-        Position next = (min == null) ? lastDispatchable : 
ledger.getPreviousPosition(min);
         Position prev = maxReadPosition;
         maxReadPosition = next;
-        // Only fire the callback on forward motion. Initial-state setup at 
recovery may move
-        // the position backwards (LAC -> previous(firstOpenWrite)); that's 
not a "moved forward".
         if (next.compareTo(prev) > 0 && maxReadPositionCallBack != null) {
             maxReadPositionCallBack.maxReadPositionMovedForward(prev, next);
         }
     }
 
-    // ---- SPI surface (lifecycle, queries) ---------------------------------
+    // ---- SPI surface -------------------------------------------------------
 
     @Override
     public Position getMaxReadPosition() {
@@ -391,13 +578,11 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
     public boolean isTxnAborted(TxnID txnID, Position readPosition) {
         String key = TxnIds.toKey(txnID);
         synchronized (lock) {
-            TxnEntry entry = txns.get(key);
-            if (entry == null) {
-                // No record of this txn — must be either orphan (broker crash 
mid-publish) or
-                // long-aborted-and-cleaned. Filtering is the safe default.
-                return true;
-            }
-            return entry.state == TxnState.ABORTED;
+            // New semantics (P3.5): default is committed/visible; only txns 
explicitly in the
+            // aborted set are filtered. maxReadPosition caps what the 
dispatcher delivers — at the
+            // lowest open txn's first write in steady state, or pinned at the 
watermark while
+            // recovery-discovered opens remain — so reads above that cap 
don't reach this check.
+            return abortedTxns.contains(key);
         }
     }
 
@@ -410,6 +595,13 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
         synchronized (lock) {
             lastDispatchable = position;
             recomputeMaxReadPositionLocked();
+            // Persist the new watermark if it advanced as a result of the 
non-txn append.
+            stateTail = stateTail.thenCompose(__ -> 
persistWatermarkIfAdvanced())
+                    .exceptionally(err -> {
+                        log.warn().attr("segment", segmentName).exception(err)
+                                .log("Watermark persist on normal publish 
failed; will retry");
+                        return null;
+                    });
         }
     }
 
@@ -440,19 +632,14 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
 
     @Override
     public CompletableFuture<Void> commitTxn(TxnID txnID, long lowWaterMark) {
-        // No-op for the metadata-driven TB: v5 commits are driven by the TC 
writing /txn header CAS
-        // and the segment-event stream, not by direct SPI calls.
         return CompletableFuture.completedFuture(null);
     }
 
     @Override
     public CompletableFuture<Void> abortTxn(TxnID txnID, long lowWaterMark) {
-        // No-op (see commitTxn).
         return CompletableFuture.completedFuture(null);
     }
 
-    // ---- SPI surface (snapshots / readers — unused in v5) -----------------
-
     @Override
     public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
         return CompletableFuture.completedFuture(null);
@@ -526,11 +713,19 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
 
     private static final class TxnEntry {
         TxnState state;
-        Position firstPosition; // null if no writes on this segment yet, or 
after termination
-
-        TxnEntry(TxnState state, Position firstPosition) {
+        /** Earliest position the TB itself has seen on this segment for this 
txn. */
+        Position firstPosition;
+        /** Latest position the TB itself has seen on this segment for this 
txn. */
+        Position lastPosition;
+        /**
+         * True if this entry was created by recovery from a leftover {@code 
/txn/op} record —
+         * the TB doesn't know the real positions and pins {@code 
maxReadPosition} at the watermark
+         * until this txn resolves.
+         */
+        boolean recoveryDiscovered;
+
+        TxnEntry(TxnState state) {
             this.state = state;
-            this.firstPosition = firstPosition;
         }
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/AbortedTxnRecord.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/AbortedTxnRecord.java
new file mode 100644
index 00000000000..5893275ae70
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/AbortedTxnRecord.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+/**
+ * Per-aborted-txn durable record for a segment. Stored at
+ * {@code /txn/segment-state/aborted/<encoded-segment>:<txnId>} with
+ * {@code partitionKey = segmentKey(segment)} and a {@code 
idx:txn-aborted-by-position}
+ * secondary-index entry keyed by {@code <encoded-segment>:<padded-position>}.
+ *
+ * <p>Carries the txn's highest position in this segment. The position-keyed 
index lets the TB
+ * range-delete aborted records as the segment ML trims its earliest data — 
agnostic to how the
+ * ML organises its underlying storage.
+ *
+ * @param maxLedgerId ledger id of the aborted txn's highest position in this 
segment
+ * @param maxEntryId  entry id of the aborted txn's highest position in this 
segment
+ */
+public record AbortedTxnRecord(long maxLedgerId, long maxEntryId) {
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/SegmentWatermark.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/SegmentWatermark.java
new file mode 100644
index 00000000000..1a9a2be3334
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/SegmentWatermark.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+/**
+ * Per-segment durable watermark record. Stored at
+ * {@code /txn/segment-state/watermark/<encoded-segment>} with
+ * {@code partitionKey = segmentKey(segment)}.
+ *
+ * <p>The position below which every transactional message in the segment is 
fully resolved —
+ * either committed-and-visible or in the segment's aborted-txn set. The 
dispatcher never reads
+ * above this position. Identical in meaning to today's {@code 
maxReadPosition}, but persisted so
+ * the TB can survive a restart and a new {@code EARLIEST} subscription can 
still classify old
+ * transactional data after the original {@code /txn/id/<txnId>} headers have 
been GC'd.
+ *
+ * @param ledgerId managed-ledger ledger id of the watermark position
+ * @param entryId  managed-ledger entry id of the watermark position
+ */
+public record SegmentWatermark(long ledgerId, long entryId) {
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
index 7f95a9be907..323473ac8b1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
@@ -303,6 +303,84 @@ public class TxnMetadataStore {
                 listener, Set.of(new Option.PartitionKey(pk)));
     }
 
+    // ---- Per-segment durable visibility state -----------------------------
+
+    /**
+     * Read the segment's watermark record, or {@link Optional#empty()} if it 
doesn't exist (a
+     * fresh segment that has never had a transactional message).
+     */
+    public CompletableFuture<Optional<Versioned<SegmentWatermark>>> 
getSegmentWatermark(String segment) {
+        Set<Option> opts = Set.of(new 
Option.PartitionKey(TxnPaths.segmentKey(segment)));
+        return store.get(TxnPaths.segmentWatermarkPath(segment), opts)
+                .thenApply(opt -> opt.map(gr -> new Versioned<>(
+                        fromJson(gr.getValue(), SegmentWatermark.class), 
gr.getStat().getVersion())));
+    }
+
+    /**
+     * CAS-write the segment's watermark. Pass {@link Optional#empty()} for 
unconditional create
+     * (must not exist); pass a version from a prior {@link 
#getSegmentWatermark} for an update.
+     */
+    public CompletableFuture<Stat> casSegmentWatermark(String segment, 
SegmentWatermark watermark,
+                                                       Optional<Long> 
expectedVersion) {
+        Set<Option> opts = Set.of(new 
Option.PartitionKey(TxnPaths.segmentKey(segment)));
+        return store.put(TxnPaths.segmentWatermarkPath(segment), 
toJson(watermark),
+                expectedVersion, opts);
+    }
+
+    /**
+     * Persist a per-aborted-txn record for {@code (segment, txnId)}. Also 
writes the
+     * {@link TxnPaths#IDX_TXN_ABORTED_BY_POSITION} secondary-index entry 
keyed by the max
+     * position so the TB can range-delete on ML trim.
+     */
+    public CompletableFuture<Stat> putAbortedTxn(String segment, String txnId, 
long maxLedgerId,
+                                                 long maxEntryId) {
+        AbortedTxnRecord record = new AbortedTxnRecord(maxLedgerId, 
maxEntryId);
+        Option.SecondaryIndex idx = new 
Option.SecondaryIndex(TxnPaths.IDX_TXN_ABORTED_BY_POSITION,
+                TxnPaths.abortedByPositionIndexKey(segment, maxLedgerId, 
maxEntryId));
+        Set<Option> opts = Set.of(new 
Option.PartitionKey(TxnPaths.segmentKey(segment)), idx);
+        return store.put(TxnPaths.segmentAbortedTxnPath(segment, txnId), 
toJson(record),
+                Optional.empty(), opts);
+    }
+
+    /**
+     * Stream all aborted-txn records for {@code segment} whose max position 
falls in
+     * {@code [fromKeyInclusive, toKeyInclusive]} (use {@code null} on either 
bound for
+     * unbounded). Use {@link TxnPaths#abortedByPositionSegmentLowerBound} /
+     * {@link TxnPaths#abortedByPositionSegmentUpperBound} for the 
segment-scoped full range.
+     */
+    public CompletableFuture<Void> scanAbortedTxns(String segment,
+                                                   String fromKeyInclusive, 
String toKeyInclusive,
+                                                   ScanConsumer consumer) {
+        String segKey = TxnPaths.segmentKey(segment);
+        return store.scanByIndex(TxnPaths.TXN_SEGMENT_ABORTED_PREFIX,
+                TxnPaths.IDX_TXN_ABORTED_BY_POSITION,
+                fromKeyInclusive, toKeyInclusive,
+                gr -> {
+                    // Fallback for stores without native indexes: the records 
are flat children
+                    // of TXN_SEGMENT_ABORTED_PREFIX named "<segKey>:<txnId>" 
— match by segKey.
+                    return 
segKey.equals(TxnPaths.segmentKeyFromAbortedPath(gr.getStat().getPath()));
+                },
+                consumer);
+    }
+
+    /**
+     * Delete a single aborted-txn record (and its index entry).
+     */
+    public CompletableFuture<Void> deleteAbortedTxn(String segment, String 
txnId) {
+        Set<Option> opts = Set.of(new 
Option.PartitionKey(TxnPaths.segmentKey(segment)));
+        return store.deleteIfExists(TxnPaths.segmentAbortedTxnPath(segment, 
txnId),
+                Optional.empty(), opts);
+    }
+
+    /**
+     * Delete the segment's watermark record — used by the TB at 
segment-teardown alongside
+     * deleting any remaining aborted records.
+     */
+    public CompletableFuture<Void> deleteSegmentWatermark(String segment) {
+        Set<Option> opts = Set.of(new 
Option.PartitionKey(TxnPaths.segmentKey(segment)));
+        return store.deleteIfExists(TxnPaths.segmentWatermarkPath(segment), 
Optional.empty(), opts);
+    }
+
     // ---- JSON helpers ------------------------------------------------------
 
     /** @return UTF-8 JSON bytes for {@code value}. Wraps any I/O error as 
{@link CompletionException}. */
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
index d9dbbb6b79a..f68d8d73bd1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
@@ -62,6 +62,13 @@ public final class TxnPaths {
      */
     public static final String TXN_SUBSCRIPTION_EVENTS_PREFIX = 
"/txn/subscription-events";
 
+    /**
+     * Path prefix for per-segment durable visibility state. Holds the 
segment's watermark
+     * record and one aborted-txn record per aborted txn with still-readable 
data in the
+     * segment. All records co-locate via {@code partitionKey = 
segmentKey(segment)}.
+     */
+    public static final String TXN_SEGMENT_STATE_PREFIX = "/txn/segment-state";
+
     /** Index: list write ops by segment. Key = segment. */
     public static final String IDX_WRITES_BY_SEGMENT = "idx:writes-by-segment";
 
@@ -77,6 +84,14 @@ public final class TxnPaths {
      */
     public static final String IDX_TXN_BY_FINAL_STATE = 
"idx:txn-by-final-state";
 
+    /**
+     * Index: list per-segment aborted-txn records by max position. Key =
+     * {@code <encoded-segment>:padded(ledgerId):padded(entryId)}. Used by the 
TB to scan
+     * its segment's aborted txns into an in-memory set on recovery, and to 
range-delete
+     * the records as the segment ML trims past their max position.
+     */
+    public static final String IDX_TXN_ABORTED_BY_POSITION = 
"idx:txn-aborted-by-position";
+
     /** Width used when formatting long values into 
lexicographically-orderable index keys. */
     public static final int LONG_KEY_WIDTH = 20;
 
@@ -86,6 +101,18 @@ public final class TxnPaths {
      */
     public static final String MAX_LONG_KEY = "99999999999999999999";
 
+    /**
+     * The minimum {@link #LONG_KEY_WIDTH}-wide decimal — useful as the lower 
bound of a
+     * range scan that starts at position zero.
+     */
+    public static final String MIN_LONG_KEY = "00000000000000000000";
+
+    /** Suffix selecting the lowest {@code (ledgerId, entryId)} position in a 
per-segment range. */
+    private static final String MIN_POSITION_SUFFIX = ":" + MIN_LONG_KEY + ":" 
+ MIN_LONG_KEY;
+
+    /** Suffix selecting the highest {@code (ledgerId, entryId)} position in a 
per-segment range. */
+    private static final String MAX_POSITION_SUFFIX = ":" + MAX_LONG_KEY + ":" 
+ MAX_LONG_KEY;
+
     /** @return {@code /txn/id/<txnId>} — the header path for {@code txnId}. */
     public static String header(String txnId) {
         return TXN_HEADER_PREFIX + "/" + txnId;
@@ -134,9 +161,97 @@ public final class TxnPaths {
         return segmentKey(segment) + ":" + Codec.encode(subscription);
     }
 
+    /** Parent path for per-segment watermark records. Records are direct 
children of this. */
+    public static final String TXN_SEGMENT_WATERMARK_PREFIX = 
TXN_SEGMENT_STATE_PREFIX + "/watermark";
+
+    /** Parent path for per-aborted-txn records, flat across all segments. */
+    public static final String TXN_SEGMENT_ABORTED_PREFIX = 
TXN_SEGMENT_STATE_PREFIX + "/aborted";
+
+    /**
+     * @return {@code /txn/segment-state/watermark/<encoded-segment>} — 
durable watermark record
+     *     for {@code segment}. Direct child of {@link 
#TXN_SEGMENT_WATERMARK_PREFIX} so the
+     *     fallback {@code scanByIndex} path works on backends without a 
native secondary index.
+     */
+    public static String segmentWatermarkPath(String segment) {
+        return TXN_SEGMENT_WATERMARK_PREFIX + "/" + segmentKey(segment);
+    }
+
+    /**
+     * @return {@code /txn/segment-state/aborted/<encoded-segment>:<txnId>} — 
durable
+     *     per-aborted-txn record. Direct child of {@link 
#TXN_SEGMENT_ABORTED_PREFIX} (flat
+     *     across all segments) so the fallback scan finds it; {@code 
<encoded-segment>:<txnId>}
+     *     keeps the per-segment grouping addressable.
+     */
+    public static String segmentAbortedTxnPath(String segment, String txnId) {
+        return TXN_SEGMENT_ABORTED_PREFIX + "/" + segmentKey(segment) + ":" + 
txnId;
+    }
+
+    /**
+     * @return the {@link #IDX_TXN_ABORTED_BY_POSITION} index key for a 
per-segment aborted-txn
+     *     record. Format: {@code 
<encoded-segment>:padded(ledgerId):padded(entryId)} — the
+     *     encoded prefix scopes scans to one segment; the padded position is 
lexicographically
+     *     ordered so range scans by trim point work naturally.
+     */
+    public static String abortedByPositionIndexKey(String segment, long 
ledgerId, long entryId) {
+        return segmentKey(segment) + ":" + longKey(ledgerId) + ":" + 
longKey(entryId);
+    }
+
+    /** @return the lower bound of the per-segment range in {@link 
#IDX_TXN_ABORTED_BY_POSITION}. */
+    public static String abortedByPositionSegmentLowerBound(String segment) {
+        return segmentKey(segment) + MIN_POSITION_SUFFIX;
+    }
+
+    /** @return the upper bound of the per-segment range in {@link 
#IDX_TXN_ABORTED_BY_POSITION}. */
+    public static String abortedByPositionSegmentUpperBound(String segment) {
+        return segmentKey(segment) + MAX_POSITION_SUFFIX;
+    }
+
+    /**
+     * Extract the {@code txnId} part from a path returned by {@link 
#segmentAbortedTxnPath}. The
+     * path is {@code .../aborted/<encoded-segment>:<txnId>}; {@code 
encoded-segment} is URL-
+     * encoded (no {@code :}) so the first {@code :} in the trailing name is 
the segment / txn
+     * separator.
+     *
+     * @return the txnId key, or {@code null} if {@code abortedPath} doesn't 
match
+     */
+    public static String txnIdFromAbortedPath(String abortedPath) {
+        int lastSlash = abortedPath.lastIndexOf('/');
+        if (lastSlash < 0) {
+            return null;
+        }
+        String name = abortedPath.substring(lastSlash + 1);
+        int colon = name.indexOf(':');
+        if (colon < 0 || colon == name.length() - 1) {
+            return null;
+        }
+        return name.substring(colon + 1);
+    }
+
+    /**
+     * @return the {@code segmentKey} part from a path returned by {@link 
#segmentAbortedTxnPath}.
+     */
+    public static String segmentKeyFromAbortedPath(String abortedPath) {
+        int lastSlash = abortedPath.lastIndexOf('/');
+        if (lastSlash < 0) {
+            return null;
+        }
+        String name = abortedPath.substring(lastSlash + 1);
+        int colon = name.indexOf(':');
+        return colon < 0 ? null : name.substring(0, colon);
+    }
+
     /** @return {@code value} formatted as a zero-padded fixed-width decimal 
for use as a range-scan index key. */
     public static String longKey(long value) {
-        return String.format("%0" + LONG_KEY_WIDTH + "d", value);
+        // value is always non-negative here (ledger/entry ids, epoch millis), 
and a long never
+        // exceeds LONG_KEY_WIDTH digits — so build the padded string directly 
and skip the
+        // String.format overhead on this index-key hot path.
+        char[] buf = new char[LONG_KEY_WIDTH];
+        long v = value;
+        for (int i = LONG_KEY_WIDTH - 1; i >= 0; i--) {
+            buf[i] = (char) ('0' + (int) (v % 10));
+            v /= 10;
+        }
+        return new String(buf);
     }
 
     /** @return the composite final-state index key {@code 
<state>:padded(finalizedMs)}. */
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
index 44b6e01aaa4..804e12d1bd5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
@@ -29,6 +29,7 @@ import io.netty.buffer.Unpooled;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -36,12 +37,14 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.metadata.AbortedTxnRecord;
 import org.apache.pulsar.broker.transaction.metadata.TxnEvent;
 import org.apache.pulsar.broker.transaction.metadata.TxnHeader;
 import org.apache.pulsar.broker.transaction.metadata.TxnIds;
 import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
 import org.apache.pulsar.broker.transaction.metadata.TxnOp;
 import org.apache.pulsar.broker.transaction.metadata.TxnOpKind;
+import org.apache.pulsar.broker.transaction.metadata.TxnPaths;
 import org.apache.pulsar.broker.transaction.metadata.TxnState;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.metadata.api.MetadataStore;
@@ -171,47 +174,214 @@ public class MetadataTransactionBufferTest {
     }
 
     @Test
-    public void unknownTxn_isTxnAbortedReturnsTrue() throws Exception {
+    public void unknownTxn_isTxnAbortedReturnsFalse() throws Exception {
+        // P3.5 semantics: "below watermark + not in aborted set → visible". 
Unknown txns default
+        // visible — orphans are eliminated by the publish-path ordering 
reversal (op-record
+        // before ML append), so a txnId we have no record of is either 
long-cleaned-up
+        // (committed) or never existed. Either way: visible/committed-default.
         MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic, 
txnStore);
         tb.checkIfTBRecoverCompletely().get();
 
-        // Never seen — must be filtered as aborted (orphan or long-cleaned).
-        assertThat(tb.isTxnAborted(new TxnID(99, 99), 
PositionFactory.create(1, 0))).isTrue();
+        assertThat(tb.isTxnAborted(new TxnID(99, 99), 
PositionFactory.create(1, 0))).isFalse();
     }
 
     @Test
-    public void recovery_rebuildsOpenTxnStateFromOpRecords() throws Exception {
-        // Pre-populate: one OPEN txn with two writes on this segment, one 
COMMITTED txn with one
-        // (lingering) write, and one txn touching a different segment (must 
not appear here).
-        TxnID openTxn = new TxnID(1, 1);
-        TxnID committedTxn = new TxnID(1, 2);
-        TxnID otherSegTxn = new TxnID(1, 3);
+    public void restartAfterCommit_committedDataStillVisible() throws 
Exception {
+        // The bug P3.5 fixes: after a TB restart on a topic with old 
committed transactional
+        // data, the original /txn/id/<txnId> headers may have been GC'd. With 
P3's
+        // "unknown → aborted" default + cleanup-on-apply, the committed 
messages would be
+        // wrongly filtered. With P3.5's "unknown → visible (committed) below 
watermark", they
+        // are not.
+        //
+        // Simulate: pre-populate the durable per-segment watermark 
(representing a TB instance
+        // that previously processed commits and persisted the resolved-below 
mark). The header
+        // and /txn/op records are absent — GC ran. Construct a fresh TB and 
verify the
+        // committed txn's messages are visible.
+
+        TxnID oldCommittedTxn = new TxnID(7, 42);
+        // Watermark sits at 5:0 — below this is fully resolved per the 
previous TB instance.
+        txnStore.casSegmentWatermark(SEGMENT,
+                new 
org.apache.pulsar.broker.transaction.metadata.SegmentWatermark(5, 0),
+                Optional.of(-1L)).get();
 
-        createOpenHeader(openTxn);
-        txnStore.appendOp(TxnIds.toKey(openTxn),
-                new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 1L, null)).get();
-        txnStore.appendOp(TxnIds.toKey(openTxn),
-                new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 2L, null)).get();
+        MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic, 
txnStore);
+        tb.checkIfTBRecoverCompletely().get();
 
-        txnStore.createHeader(TxnIds.toKey(committedTxn),
-                new TxnHeader(TxnState.COMMITTED, Duration.ofMillis(5000),
-                        Instant.ofEpochMilli(1000), 
Instant.ofEpochMilli(2000))).get();
-        txnStore.appendOp(TxnIds.toKey(committedTxn),
-                new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 3L, null)).get();
+        // The bug fix: isTxnAborted no longer treats unknown-below-watermark 
as aborted, so
+        // committed messages whose original /txn/id headers have been GC'd 
are still visible.
+        assertThat(tb.isTxnAborted(oldCommittedTxn, PositionFactory.create(3, 
5))).isFalse();
+        // With no open txns recovered, maxReadPosition can advance past the 
watermark to LAC —
+        // the absence of /txn/op records is positive evidence (by the 
orphan-elimination
+        // invariant) that no unresolved txn data sits above the watermark.
+        
assertThat(tb.getMaxReadPosition()).isEqualTo(ledger.getLastConfirmedEntry());
+    }
+
+    @Test
+    public void restartAfterAbort_abortedTxnStillFiltered() throws Exception {
+        // Mirror of the commit scenario: an aborted txn's durable record 
persists in
+        // /txn/segment-state/<segment>/aborted/<txnId> with an index entry on 
max position.
+        // Across restarts, isTxnAborted continues to return true even if the 
original
+        // /txn/id/<txnId> header has been GC'd.
+
+        TxnID oldAbortedTxn = new TxnID(7, 99);
+        String txnIdKey = TxnIds.toKey(oldAbortedTxn);
+        txnStore.casSegmentWatermark(SEGMENT,
+                new 
org.apache.pulsar.broker.transaction.metadata.SegmentWatermark(5, 0),
+                Optional.of(-1L)).get();
+        // Persisted aborted record. Header doesn't exist (GC'd).
+        txnStore.putAbortedTxn(SEGMENT, txnIdKey, 3L, 5L).get();
+
+        MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic, 
txnStore);
+        tb.checkIfTBRecoverCompletely().get();
 
-        createOpenHeader(otherSegTxn);
-        txnStore.appendOp(TxnIds.toKey(otherSegTxn),
-                new TxnOp(TxnOpKind.WRITE, 
"segment://public/default/topic/other-seg", null, 5L, 9L, null)).get();
+        assertThat(tb.isTxnAborted(oldAbortedTxn, PositionFactory.create(3, 
5))).isTrue();
+    }
+
+    @Test
+    public void recoveryDiscoveredOpenTxn_pinsAtWatermark() throws Exception {
+        // /txn/op records exist for an open txn (broker was processing 
publishes for txn T;
+        // T's resolution hadn't yet completed; watermark was persisted at 5:0 
before the crash).
+        // On restart the TB pins maxReadPosition at the persisted watermark — 
it doesn't know
+        // T's real positions and must not expose any segment data above the 
watermark until T
+        // resolves.
+
+        TxnID openTxn = new TxnID(1, 1);
+        createOpenHeader(openTxn);
+        // Position values in the legacy /txn/op format are no longer read by 
recovery; only the
+        // existence of the record matters (it tells recovery this txn is 
involved on this segment).
+        txnStore.appendOp(TxnIds.toKey(openTxn),
+                new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 0L, 0L, null)).get();
+        // An OPEN txn whose only op record is on a *different* segment must 
not be recovered here.
+        TxnID otherSegmentTxn = new TxnID(2, 2);
+        createOpenHeader(otherSegmentTxn);
+        txnStore.appendOp(TxnIds.toKey(otherSegmentTxn),
+                new TxnOp(TxnOpKind.WRITE, 
"segment://public/default/topic/0000-ffff-1",
+                        null, 0L, 0L, null)).get();
+        txnStore.casSegmentWatermark(SEGMENT,
+                new 
org.apache.pulsar.broker.transaction.metadata.SegmentWatermark(5, 0),
+                Optional.of(-1L)).get();
 
         MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic, 
txnStore);
         tb.checkIfTBRecoverCompletely().get();
 
-        // openTxn pins max read position at min(5:1, 5:2) - 1 = 5:0.
         
assertThat(tb.getMaxReadPosition()).isEqualTo(PositionFactory.create(5, 0));
+        // Only the same-segment txn is tracked; the other-segment op record 
isn't recovered here.
         assertThat(tb.getOngoingTxnCount()).isOne();
-        assertThat(tb.isTxnAborted(openTxn, PositionFactory.create(5, 
1))).isFalse();
-        // Committed-txn lingering writes should not pin max read position; 
the txn isn't in OPEN set.
-        // (Cleanup is async; we don't assert on its completion here.)
+        // An OPEN recovery-discovered txn is not in the aborted set.
+        assertThat(tb.isTxnAborted(openTxn, PositionFactory.create(5, 
0))).isFalse();
+    }
+
+    @Test
+    public void 
recoveryDiscoveredAbortedTxns_filteredImmediatelyAfterRecovery() throws 
Exception {
+        // Regression for the recovery race: txns whose headers are ABORTED 
and whose only remaining
+        // trace is a leftover /txn/op record — no durable aborted record was 
ever written (the
+        // previous broker crashed before applyTerminalNow persisted it). 
isTxnAborted must return
+        // true for every such txn the instant recovery completes. ABORTED 
recovery entries aren't
+        // watermark-pinned, so any visibility window here would expose the 
aborted data.
+        //
+        // The race only opens with multiple terminal txns: the first 
applyTerminalNow runs
+        // synchronously (stateTail was complete) and adds itself, but a 
second is queued behind the
+        // first's in-flight persist chain. We force exactly that by hanging 
the first putAbortedTxn,
+        // so the second txn's queued apply cannot run before recovery 
completes — only the
+        // synchronous hydrate in applyHeaderForRecovery makes it filtered in 
time.
+
+        TxnID abortedA = new TxnID(7, 42);
+        TxnID abortedB = new TxnID(7, 43);
+        for (TxnID t : new TxnID[] {abortedA, abortedB}) {
+            createOpenHeader(t);
+            abortTxn(t);
+            // Leftover op record on this segment; no putAbortedTxn — durable 
aborted record is gone.
+            txnStore.appendOp(TxnIds.toKey(t),
+                    new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 0L, 0L, 
null)).get();
+        }
+
+        // Spy the store and hang the first persisted-aborted-record write so 
its apply chain stays
+        // in flight, blocking any txn queued behind it on stateTail.
+        TxnMetadataStore spied = org.mockito.Mockito.spy(txnStore);
+        CompletableFuture<Stat> firstPut = new CompletableFuture<>();
+        java.util.concurrent.atomic.AtomicBoolean first = new 
java.util.concurrent.atomic.AtomicBoolean(true);
+        doAnswer(inv -> {
+            if (first.getAndSet(false)) {
+                return firstPut;
+            }
+            return inv.callRealMethod();
+        }).when(spied).putAbortedTxn(any(), any(), 
org.mockito.ArgumentMatchers.anyLong(),
+                org.mockito.ArgumentMatchers.anyLong());
+
+        MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic, 
spied);
+        tb.checkIfTBRecoverCompletely().get();
+
+        // Both must be filtered as of recovery completion — without the 
synchronous hydrate, the
+        // txn queued behind the hung persist would still read as visible here.
+        assertThat(tb.isTxnAborted(abortedA, PositionFactory.create(10, 
0))).isTrue();
+        assertThat(tb.isTxnAborted(abortedB, PositionFactory.create(10, 
0))).isTrue();
+
+        // Release the hung write so the apply chain can drain cleanly.
+        firstPut.complete(null);
+    }
+
+    @Test
+    public void 
recoveryDiscoveredAbortedTxn_persistsAbortedRecordAtSegmentLac() throws 
Exception {
+        // Regression for the prune-key bug: a recovery-discovered aborted txn 
has unknown
+        // positions, so persistAbortedRecord falls back to the segment LAC — 
NOT the durable
+        // watermark. The txn's data sits above the watermark, so keying the 
record on the
+        // watermark would let trim-pruning drop it while the data is still 
readable.
+        TxnID abortedTxn = new TxnID(8, 99);
+        String txnIdKey = TxnIds.toKey(abortedTxn);
+        createOpenHeader(abortedTxn);
+        abortTxn(abortedTxn);
+        txnStore.appendOp(txnIdKey, new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 
0L, 0L, null)).get();
+        // Watermark (5,0) is well below the segment LAC (10,0).
+        txnStore.casSegmentWatermark(SEGMENT,
+                new 
org.apache.pulsar.broker.transaction.metadata.SegmentWatermark(5, 0),
+                Optional.of(-1L)).get();
+
+        MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic, 
txnStore);
+        tb.checkIfTBRecoverCompletely().get();
+
+        // The aborted record is persisted on stateTail after recovery 
completes; await it and
+        // assert its stored max position is the LAC (10,0), not the watermark 
(5,0).
+        Awaitility.await().untilAsserted(() -> {
+            var opt = store.get(TxnPaths.segmentAbortedTxnPath(SEGMENT, 
txnIdKey)).get();
+            assertThat(opt).isPresent();
+            AbortedTxnRecord rec = 
TxnMetadataStore.fromJson(opt.get().getValue(), AbortedTxnRecord.class);
+            assertThat(rec.maxLedgerId()).isEqualTo(10L);
+            assertThat(rec.maxEntryId()).isEqualTo(0L);
+        });
+    }
+
+    @Test
+    public void publishOrdering_opRecordWrittenBeforeMlAppend() throws 
Exception {
+        // The orphan-elimination invariant: when appendBufferToTxn returns, 
both /txn/op and
+        // the ML entry exist. With the reversed ordering, the op record is 
written first; if
+        // the ML append then fails, the op record is still present and the 
TC's timeout sweep
+        // will abort the txn and clean up. The reverse — entry without op — 
cannot happen by
+        // construction.
+
+        TxnID txnId = new TxnID(1, 1);
+        createOpenHeader(txnId);
+        MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic, 
txnStore);
+        tb.checkIfTBRecoverCompletely().get();
+
+        tb.appendBufferToTxn(txnId, 0, payload("a")).get();
+
+        // /txn/op record should exist for this txn on this segment.
+        java.util.List<TxnOp> hits = new java.util.ArrayList<>();
+        txnStore.listWritesBySegment(SEGMENT,
+                new org.apache.pulsar.metadata.api.ScanConsumer() {
+                    @Override
+                    public void 
onNext(org.apache.pulsar.metadata.api.GetResult r) {
+                        hits.add(TxnMetadataStore.fromJson(r.getValue(), 
TxnOp.class));
+                    }
+                    @Override public void onError(Throwable t) { }
+                    @Override public void onCompleted() { }
+                }).get();
+        assertThat(hits).hasSize(1);
+        assertThat(hits.get(0).getKind()).isEqualTo(TxnOpKind.WRITE);
+        // The WRITE op record carries sentinel positions (0/0); positions 
live in TB's
+        // in-memory tracking, not in the op record.
+        assertThat(hits.get(0).getLedgerId()).isZero();
+        assertThat(hits.get(0).getEntryId()).isZero();
     }
 
     // ---- helpers -----------------------------------------------------------

Reply via email to