This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7e4de065bb0 [improve][broker] PIP-473 P3.5: durable per-segment
visibility state (#25821)
7e4de065bb0 is described below
commit 7e4de065bb04ac5727fd5db9c3a0f41a7b709aa3
Author: Matteo Merli <[email protected]>
AuthorDate: Thu May 21 16:28:51 2026 +0100
[improve][broker] PIP-473 P3.5: durable per-segment visibility state
(#25821)
---
.../buffer/impl/MetadataTransactionBuffer.java | 491 ++++++++++++++-------
.../transaction/metadata/AbortedTxnRecord.java | 35 ++
.../transaction/metadata/SegmentWatermark.java | 36 ++
.../transaction/metadata/TxnMetadataStore.java | 78 ++++
.../broker/transaction/metadata/TxnPaths.java | 117 ++++-
.../buffer/impl/MetadataTransactionBufferTest.java | 222 ++++++++--
6 files changed, 804 insertions(+), 175 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
index 46326b91afd..845c1c0328a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
@@ -41,6 +41,7 @@ import
org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
+import org.apache.pulsar.broker.transaction.metadata.SegmentWatermark;
import org.apache.pulsar.broker.transaction.metadata.TxnHeader;
import org.apache.pulsar.broker.transaction.metadata.TxnIds;
import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
@@ -58,35 +59,35 @@ import
org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.ScanConsumer;
/**
- * {@link TransactionBuffer} for {@code segment://} topics that reads truth
from the metadata-store
- * transaction layout (PIP-473) rather than from a per-topic snapshot log.
+ * {@link TransactionBuffer} for {@code segment://} topics that reads truth
from the
+ * metadata-store transaction layout (PIP-473).
*
- * <p>Lifecycle:
- * <ul>
- * <li><b>Publish</b> — {@link #appendBufferToTxn} reads the txn header
(cache-first), appends the
- * entry to the managed ledger, then appends a {@link TxnOp} record under
- * {@code /txn/op/<txnId>-<seq>}. Both must succeed before we ack the
publisher.</li>
- * <li><b>State transitions</b> — driven by {@code
/txn/segment-events/<segment>-*} sequence
- * events. The events are wake-ups; the truth is the header. On each
notification we
- * re-read headers for every currently-open txn and apply the resulting
state changes.</li>
- * <li><b>Recovery</b> (Option C) — scan {@code idx:writes-by-segment} for
this segment, group by
- * {@code txnId}, fetch each header, and seed the in-memory cache. Then
subscribe to the
- * event stream for forward updates.</li>
- * </ul>
+ * <p><b>Publish-path ordering.</b> {@link #appendBufferToTxn} writes the
{@code /txn/op} record
+ * <em>before</em> the managed-ledger append. This eliminates the orphan class
— a crash between
+ * the two writes leaves either (a) no entry and no op record, or (b) an op
record with no entry
+ * (TC times out → ABORTED → cleanup). There's never a ledger entry with no op
record. The invariant
+ * <i>every transactional entry in the segment has a corresponding {@code
/txn/op} record at the
+ * time of append</i> lets recovery scan {@code /txn/op} authoritatively
without segment-replay.
*
- * <p><b>TC ordering contract.</b> There is a TOCTOU window between the header
authorization read
- * in {@link #appendBufferToTxn} and the managed-ledger append: the TC may
flip the header (commit
- * or abort) in between, and the entry still lands. On commit that's harmless
— the message is
- * visible. On abort, the subsequent segment-event delivery marks the txn
ABORTED in the cache and
- * {@link #isTxnAborted} filters it. This relies on the TC publishing the
segment event <em>after</em>
- * the header CAS lands so a participant that lost the race always learns the
decision. The legacy
- * {@code TopicTransactionBuffer} has the same window with marker-message
ordering.
+ * <p><b>Durable visibility state.</b> The TB persists a per-segment
+ * {@code /txn/segment-state/<segment>/watermark} record (the resolved-below
mark) plus one
+ * {@code aborted/<txnId>} record per aborted txn with still-readable data.
Together they let
+ * {@link #isTxnAborted} answer correctly for as long as the data is in the
segment ML, even after
+ * the original {@code /txn/id/<txnId>} headers have been GC'd. The aborted
records carry the txn's
+ * max position via a secondary index so the TB can range-delete them when the
segment ML trims its
+ * older data.
+ *
+ * <p><b>Recovery.</b> Load the durable watermark + aborted set; scan {@code
/txn/op} for this
+ * segment to discover any txns still open at the time of the previous
shutdown (their first
+ * positions aren't known — the in-memory watermark stays pinned at the
durable value until they
+ * resolve); subscribe to the segment-event stream.
*
- * <p><b>In-memory growth.</b> Terminal txns stay in the {@code txns} cache
for the segment's
- * lifetime so {@code isTxnAborted} can answer authoritatively for dispatcher
reads — evicting a
- * COMMITTED entry would mean the default "unknown → aborted" filter wrongly
hides its messages.
- * Long-running segments with high txn turnover will accumulate cached
entries. Cache pruning tied
- * to data-ledger trimming / header GC is a P5/P6 concern.
+ * <p><b>TC ordering contract.</b> There is a TOCTOU window between the header
authorization read
+ * and the {@code /txn/op} write: the TC may flip the header (commit or abort)
in between. The op
+ * record still lands. On commit that's harmless. On abort, the subsequent
segment-event delivery
+ * marks the txn ABORTED and the per-txn aborted record is written before the
op record is deleted,
+ * so {@link #isTxnAborted} filters subsequent reads. This relies on the TC
publishing the segment
+ * event <em>after</em> the header CAS.
*/
@CustomLog
public class MetadataTransactionBuffer implements TransactionBuffer {
@@ -101,15 +102,35 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
private volatile AutoCloseable subscription;
private volatile boolean closed;
- /** Guards {@link #txns} + {@link #maxReadPosition} + {@link
#lastDispatchable}. */
+ /** Guards mutable state below. */
private final Object lock = new Object();
- /** Cached per-txn state, populated by appendBufferToTxn and refreshed by
event reconcile. */
+ /** In-memory per-txn state for txns this segment is involved in. */
private final Map<String, TxnEntry> txns = new HashMap<>();
- private Position maxReadPosition;
+ /** Aborted-txn set hydrated from durable state at recovery, updated on
abort-apply. */
+ private final Set<String> abortedTxns = new HashSet<>();
+
+ /** Count of OPEN txns we discovered at recovery whose first position we
don't know. */
+ private int recoveryDiscoveredOpen;
+
+ /** Durable watermark, mirrored in memory. May be null on a fresh segment.
*/
+ private SegmentWatermark watermark;
+ /** Version of the durable watermark record; -1 if it doesn't exist yet. */
+ private long watermarkVersion = -1L;
+
+ /** Latest dispatched position from non-txn publishes — the natural
ceiling when no opens pin. */
private Position lastDispatchable;
+ /** Current maxReadPosition; never moves above the watermark while
recovery-discovered opens exist. */
+ private Position maxReadPosition;
+
+ /**
+ * Serialised chain for watermark-persist + op-record-cleanup. Each apply
enqueues itself on the
+ * tail so we never have two in-flight watermark CASes racing.
+ */
+ private CompletableFuture<Void> stateTail =
CompletableFuture.completedFuture(null);
+
private final LongAdder committedCount = new LongAdder();
private final LongAdder abortedCount = new LongAdder();
@@ -136,46 +157,82 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
}
subscription = handle;
- // Scan all /txn/op records for this segment, group by txnId.
- Map<String, List<Position>> opsByTxn = new ConcurrentHashMap<>();
- txnStore.listWritesBySegment(segmentName, new ScanConsumer() {
- @Override
- public void onNext(GetResult r) {
- TxnOp op = TxnMetadataStore.fromJson(r.getValue(),
TxnOp.class);
- String txnIdKey =
TxnPaths.txnIdFromOpPath(r.getStat().getPath());
- if (txnIdKey == null) {
- return;
- }
- opsByTxn.computeIfAbsent(txnIdKey, k -> new ArrayList<>())
- .add(PositionFactory.create(op.getLedgerId(),
op.getEntryId()));
- }
+ // 1. Load durable watermark.
+ CompletableFuture<Void> watermarkLoad =
txnStore.getSegmentWatermark(segmentName)
+ .thenAccept(opt -> {
+ if (opt.isPresent()) {
+ synchronized (lock) {
+ watermark = opt.get().value();
+ watermarkVersion = opt.get().version();
+ // Initialise maxReadPosition at the watermark so
we don't expose anything
+ // above it until in-memory state catches up.
+ maxReadPosition = PositionFactory.create(
+ watermark.ledgerId(), watermark.entryId());
+ }
+ }
+ });
- @Override
- public void onError(Throwable throwable) {
- // Recovery still fails loudly via the scan's returned future
and the terminal
- // whenComplete below; logging here captures the cause with
segment context.
- log.warn().attr("segment", segmentName).exception(throwable)
- .log("TB recovery scan errored");
- }
+ // 2. Load the aborted-txn set (scan by segment-scoped index range).
+ CompletableFuture<Void> abortedLoad = watermarkLoad.thenCompose(__ ->
+ txnStore.scanAbortedTxns(segmentName,
+
TxnPaths.abortedByPositionSegmentLowerBound(segmentName),
+
TxnPaths.abortedByPositionSegmentUpperBound(segmentName),
+ new ScanConsumer() {
+ @Override
+ public void onNext(GetResult r) {
+ String txnIdKey =
TxnPaths.txnIdFromAbortedPath(r.getStat().getPath());
+ if (txnIdKey != null) {
+ synchronized (lock) {
+ abortedTxns.add(txnIdKey);
+ }
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ log.warn().attr("segment",
segmentName).exception(throwable)
+ .log("Aborted-txn scan errored during
recovery");
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ }));
+
+ // 3. Scan /txn/op for this segment to discover txns that were open at
last shutdown.
+ Map<String, Boolean> writeOpsByTxn = new ConcurrentHashMap<>();
+ CompletableFuture<Void> opsLoad = abortedLoad.thenCompose(__ ->
+ txnStore.listWritesBySegment(segmentName, new ScanConsumer() {
+ @Override
+ public void onNext(GetResult r) {
+ String txnIdKey =
TxnPaths.txnIdFromOpPath(r.getStat().getPath());
+ if (txnIdKey != null) {
+ writeOpsByTxn.put(txnIdKey, Boolean.TRUE);
+ }
+ }
- @Override
- public void onCompleted() {
- }
- })
- .thenCompose(__ -> {
- // Fan out one header read per distinct txnId; build initial state.
+ @Override
+ public void onError(Throwable throwable) {
+ log.warn().attr("segment",
segmentName).exception(throwable)
+ .log("Op-scan errored during recovery");
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ }));
+
+ // 4. For each discovered txn, fetch its header and seed the in-memory
state.
+ opsLoad.thenCompose(__ -> {
List<CompletableFuture<Void>> reads = new ArrayList<>();
- opsByTxn.forEach((txnIdKey, positions) -> reads.add(
- txnStore.getHeader(txnIdKey).thenAccept(opt ->
applyHeaderForRecovery(
- txnIdKey, opt, positions))));
+ writeOpsByTxn.keySet().forEach(txnIdKey -> reads.add(
+ txnStore.getHeader(txnIdKey).thenAccept(opt ->
+ applyHeaderForRecovery(txnIdKey, opt))));
return FutureUtil.waitForAll(reads);
})
.whenComplete((__, err) -> {
if (err != null) {
log.error().attr("segment",
segmentName).exception(err).log("TB recovery failed");
- // Close the subscription we opened above so the listener
doesn't outlive a
- // failed-to-recover TB instance (closeAsync may never be
called if recovery never
- // succeeded).
closeSubscriptionQuietly();
recoveryFuture.completeExceptionally(err);
return;
@@ -184,22 +241,33 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
recomputeMaxReadPositionLocked();
}
recoveryFuture.complete(null);
- // Drain any events that fired between subscribe and now —
triggerReconcile short-
- // circuits while recoveryFuture is not done, so we explicitly
kick a reconcile pass
- // now to pick up state transitions whose only notification landed
in that window.
+ // Drain any events that fired between subscribe and now.
triggerReconcile();
});
}
- private void applyHeaderForRecovery(String txnIdKey,
Optional<Versioned<TxnHeader>> opt, List<Position> positions) {
+ private void applyHeaderForRecovery(String txnIdKey,
Optional<Versioned<TxnHeader>> opt) {
TxnState state = opt.map(v ->
v.value().getState()).orElse(TxnState.ABORTED);
- Position first =
positions.stream().min(Position::compareTo).orElse(null);
synchronized (lock) {
- txns.put(txnIdKey, new TxnEntry(state, first));
+ TxnEntry entry = new TxnEntry(state);
+ if (state == TxnState.OPEN) {
+ entry.recoveryDiscovered = true;
+ recoveryDiscoveredOpen++;
+ } else if (state == TxnState.ABORTED) {
+ // Hydrate the aborted set now, under the lock, so
isTxnAborted is correct the
+ // instant recoveryFuture completes. The terminal apply that
would otherwise add
+ // this txn runs later on stateTail (after recovery
completes), leaving a window
+ // in which the txn's data — which isn't watermark-pinned,
since ABORTED entries
+ // don't set recoveryDiscovered — would read as visible. add()
is idempotent with
+ // applyTerminalNow's aborted-set update.
+ abortedTxns.add(txnIdKey);
+ }
+ txns.put(txnIdKey, entry);
}
- // Schedule op-record cleanup for terminal txns (best-effort, async).
+ // Terminal txns with leftover /txn/op records still need their
outcome materialised and
+ // the records cleaned up — enqueue an apply.
if (state.isTerminal()) {
- cleanupOpRecords(txnIdKey);
+ enqueueApplyTerminal(txnIdKey, state);
}
}
@@ -207,7 +275,6 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
@Override
public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long
sequenceId, ByteBuf buffer) {
- // Retain so the buffer survives the chain — release in the terminal
handlers.
buffer.retain();
return recoveryFuture.thenCompose(__ -> internalAppend(txnId, buffer))
.whenComplete((p, ex) -> buffer.release());
@@ -225,8 +292,16 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
return FutureUtil.failedFuture(new
BrokerServiceException.NotAllowedException(
"Transaction " + txnId + " is already " +
state + " — TxnConflict"));
}
- return appendToLedger(buffer)
- .thenCompose(position -> recordOp(txnId, txnIdKey,
position));
+ // 1. Write /txn/op first. Positions are unknown at this
point; the WRITE-kind
+ // record uses sentinel 0/0 (positions live in the TB's
in-memory tracking).
+ TxnOp op = new TxnOp(TxnOpKind.WRITE, segmentName, null,
0L, 0L, null);
+ return txnStore.appendOp(txnIdKey, op).thenCompose(opStat
->
+ // 2. ML append. By the invariant above, /txn/op
is durable before any
+ // ledger entry exists for this op.
+ appendToLedger(buffer).thenApply(position -> {
+ trackPosition(txnIdKey, position);
+ return position;
+ }));
});
}
@@ -240,7 +315,7 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
return txnStore.getHeader(txnIdKey).thenApply(opt -> {
TxnState state = opt.map(v ->
v.value().getState()).orElse(TxnState.ABORTED);
synchronized (lock) {
- txns.putIfAbsent(txnIdKey, new TxnEntry(state, null));
+ txns.putIfAbsent(txnIdKey, new TxnEntry(state));
}
return state;
});
@@ -262,36 +337,39 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
return result;
}
- private CompletableFuture<Position> recordOp(TxnID txnId, String txnIdKey,
Position position) {
- TxnOp op = new TxnOp(TxnOpKind.WRITE, segmentName, null,
- position.getLedgerId(), position.getEntryId(), null);
- return txnStore.appendOp(txnIdKey, op).thenApply(stat -> {
- synchronized (lock) {
- TxnEntry entry = txns.get(txnIdKey);
- // Only an OPEN entry pins maxReadPosition. If a concurrent
reconcile flipped this
- // txn to terminal between the cache-first authorization read
and this update, do
- // NOT resurrect it as OPEN — the ML append still lands but
isTxnAborted will mask
- // it for aborted txns (see class javadoc on the publish-side
TOCTOU window).
- if (entry != null && entry.state == TxnState.OPEN) {
- if (entry.firstPosition == null ||
position.compareTo(entry.firstPosition) < 0) {
- entry.firstPosition = position;
- recomputeMaxReadPositionLocked();
- }
- }
+ private void trackPosition(String txnIdKey, Position position) {
+ synchronized (lock) {
+ TxnEntry entry = txns.get(txnIdKey);
+ // Only an OPEN, non-recovery entry tracks positions.
Recovery-discovered entries pin
+ // at the watermark and shouldn't be re-keyed by later appends (we
don't know the
+ // earliest position).
+ if (entry == null || entry.state != TxnState.OPEN ||
entry.recoveryDiscovered) {
+ return;
}
- return position;
- });
+ if (entry.firstPosition == null ||
position.compareTo(entry.firstPosition) < 0) {
+ entry.firstPosition = position;
+ }
+ if (entry.lastPosition == null ||
position.compareTo(entry.lastPosition) > 0) {
+ entry.lastPosition = position;
+ }
+ recomputeMaxReadPositionLocked();
+ }
}
// ---- Reconcile (event-driven) -----------------------------------------
private void triggerReconcile() {
- if (closed || !recoveryFuture.isDone()) {
+ if (closed || !recoveryFuture.isDone() ||
recoveryFuture.isCompletedExceptionally()) {
return;
}
Set<String> snapshot;
synchronized (lock) {
- snapshot = new HashSet<>(openTxnsLocked());
+ snapshot = new HashSet<>();
+ for (Map.Entry<String, TxnEntry> e : txns.entrySet()) {
+ if (e.getValue().state == TxnState.OPEN) {
+ snapshot.add(e.getKey());
+ }
+ }
}
if (snapshot.isEmpty()) {
return;
@@ -300,7 +378,9 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
for (String txnIdKey : snapshot) {
reads.add(txnStore.getHeader(txnIdKey).thenAccept(opt -> {
TxnState newState = opt.map(v ->
v.value().getState()).orElse(TxnState.ABORTED);
- applyReconciledState(txnIdKey, newState);
+ if (newState.isTerminal()) {
+ enqueueApplyTerminal(txnIdKey, newState);
+ }
}));
}
FutureUtil.waitForAll(reads).whenComplete((__, err) -> {
@@ -310,75 +390,182 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
});
}
- private void applyReconciledState(String txnIdKey, TxnState newState) {
- boolean cleanup = false;
+ /**
+ * Enqueue the durable side-effects for a txn that has been observed
terminal: write the
+ * aborted record (if any), advance the persisted watermark, delete the
{@code /txn/op}
+ * records. Serialised through {@link #stateTail} so concurrent applies
don't race the
+ * watermark CAS.
+ */
+ private void enqueueApplyTerminal(String txnIdKey, TxnState newState) {
+ synchronized (lock) {
+ stateTail = stateTail.thenCompose(__ -> applyTerminalNow(txnIdKey,
newState))
+ .exceptionally(err -> {
+ log.warn().attr("segment", segmentName).attr("txnId",
txnIdKey)
+ .exception(err).log("Apply-terminal failed;
will retry on next reconcile");
+ return null;
+ });
+ }
+ }
+
+ private CompletableFuture<Void> applyTerminalNow(String txnIdKey, TxnState
newState) {
+ // Snapshot the entry under the lock — we need its positions /
recovery flag to decide
+ // what to persist.
+ boolean alreadyTerminal;
+ Position lastPos;
synchronized (lock) {
TxnEntry entry = txns.get(txnIdKey);
- if (entry == null || entry.state == newState) {
- return;
+ if (entry == null) {
+ return CompletableFuture.completedFuture(null);
}
- entry.state = newState;
- if (newState.isTerminal()) {
- entry.firstPosition = null;
- cleanup = true;
+ alreadyTerminal = entry.state.isTerminal();
+ lastPos = entry.lastPosition;
+ // Mark in-memory now so subsequent appendBufferToTxn for this txn
fail with TxnConflict.
+ if (!alreadyTerminal) {
+ entry.state = newState;
+ if (entry.recoveryDiscovered) {
+ recoveryDiscoveredOpen = Math.max(0,
recoveryDiscoveredOpen - 1);
+ entry.recoveryDiscovered = false;
+ }
if (newState == TxnState.COMMITTED) {
committedCount.increment();
- } else {
+ } else if (newState == TxnState.ABORTED) {
abortedCount.increment();
+ abortedTxns.add(txnIdKey);
}
recomputeMaxReadPositionLocked();
+ } else if (newState == TxnState.ABORTED) {
+ // Idempotent path — header re-confirms ABORTED. Make sure the
in-memory set holds
+ // it (e.g. after an in-memory rebuild that lost the set).
+ abortedTxns.add(txnIdKey);
}
}
- if (cleanup) {
- cleanupOpRecords(txnIdKey);
- }
+
+ // Persist aborted record if this is an abort.
+ CompletableFuture<Void> persistAborted = (newState == TxnState.ABORTED)
+ ? persistAbortedRecord(txnIdKey, lastPos)
+ : CompletableFuture.completedFuture(null);
+
+ return persistAborted
+ .thenCompose(__ -> persistWatermarkIfAdvanced())
+ .thenCompose(__ ->
txnStore.deleteWriteOpsForSegmentAndTxn(segmentName, txnIdKey));
}
/**
- * Delete every {@code /txn/op} record for {@code (this segment,
txnIdKey)}. Best-effort —
- * failures are logged and retried by the next reconcile.
+ * Write {@code /txn/segment-state/<segment>/aborted/<txnId>} with the
txn's max position in
+ * this segment. The stored position is the prune key: trim-driven pruning
drops an aborted
+ * record once the segment trims past it, so it must be at least as high
as the txn's highest
+ * data position, or the record would be dropped while its data is still
readable.
+ *
+ * <p>When the positions are unknown (a recovery-discovered txn with no
new appends) we fall
+ * back to the current segment LAC. The txn's data was written in a prior
epoch, so it cannot
+ * sit above the LAC — that makes the LAC a correct conservative upper
bound. The durable
+ * watermark would be wrong here: the txn's data sits <em>above</em> the
watermark, so pruning
+ * keyed on the watermark would discard the record too early.
*/
- private void cleanupOpRecords(String txnIdKey) {
- txnStore.deleteWriteOpsForSegmentAndTxn(segmentName, txnIdKey)
- .exceptionally(err -> {
- log.warn().attr("segment", segmentName).attr("txnId",
txnIdKey).exception(err)
- .log("Op-record cleanup failed; will retry on next
reconcile");
- return null;
- });
+ private CompletableFuture<Void> persistAbortedRecord(String txnIdKey,
Position lastPos) {
+ long maxLedger;
+ long maxEntry;
+ if (lastPos != null) {
+ maxLedger = lastPos.getLedgerId();
+ maxEntry = lastPos.getEntryId();
+ } else {
+ Position lac = ledger.getLastConfirmedEntry();
+ maxLedger = lac == null ? 0L : lac.getLedgerId();
+ maxEntry = lac == null ? 0L : lac.getEntryId();
+ }
+ return txnStore.putAbortedTxn(segmentName, txnIdKey, maxLedger,
maxEntry).thenApply(s -> null);
}
- // ---- maxReadPosition ---------------------------------------------------
-
- private Set<String> openTxnsLocked() {
- Set<String> open = new HashSet<>();
- for (Map.Entry<String, TxnEntry> e : txns.entrySet()) {
- if (e.getValue().state == TxnState.OPEN) {
- open.add(e.getKey());
+ /**
+ * If the in-memory watermark position has advanced beyond the durable
one, CAS-write the new
+ * value. Idempotent and self-skipping when there's nothing to do.
+ */
+ private CompletableFuture<Void> persistWatermarkIfAdvanced() {
+ SegmentWatermark toWrite;
+ long expectedVersion;
+ synchronized (lock) {
+ Position desired = maxReadPosition;
+ if (desired == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ if (watermark != null
+ && watermark.ledgerId() == desired.getLedgerId()
+ && watermark.entryId() == desired.getEntryId()) {
+ return CompletableFuture.completedFuture(null);
}
+ // Only advance forward.
+ if (watermark != null) {
+ Position existing =
PositionFactory.create(watermark.ledgerId(), watermark.entryId());
+ if (desired.compareTo(existing) <= 0) {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+ toWrite = new SegmentWatermark(desired.getLedgerId(),
desired.getEntryId());
+ expectedVersion = watermarkVersion;
}
- return open;
+ Optional<Long> expected = Optional.of(expectedVersion);
+ return txnStore.casSegmentWatermark(segmentName, toWrite, expected)
+ .thenAccept(stat -> {
+ synchronized (lock) {
+ watermark = toWrite;
+ watermarkVersion = stat.getVersion();
+ }
+ })
+ .exceptionallyCompose(ex -> {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof
MetadataStoreException.BadVersionException) {
+ // Our in-memory version is stale (a concurrent writer
moved the record).
+ // Re-read so the next enqueued apply CASes against
the current version
+ // instead of looping on the stale one. Still
propagate this failure so the
+ // caller logs+retries; the retry now has a fresh
version to work with.
+ return
txnStore.getSegmentWatermark(segmentName).thenAccept(opt -> {
+ if (opt.isPresent()) {
+ synchronized (lock) {
+ watermark = opt.get().value();
+ watermarkVersion = opt.get().version();
+ }
+ }
+ }).thenCompose(__ -> FutureUtil.failedFuture(cause));
+ }
+ return FutureUtil.failedFuture(cause);
+ });
}
+ // ---- maxReadPosition ---------------------------------------------------
+
private void recomputeMaxReadPositionLocked() {
- Position min = null;
- for (TxnEntry e : txns.values()) {
- if (e.state == TxnState.OPEN && e.firstPosition != null) {
- if (min == null || e.firstPosition.compareTo(min) < 0) {
- min = e.firstPosition;
+ Position next;
+ Position watermarkPos = (watermark == null) ? null
+ : PositionFactory.create(watermark.ledgerId(),
watermark.entryId());
+
+ if (recoveryDiscoveredOpen > 0) {
+ // Pinned at the durable watermark while any recovery-discovered
open txn remains:
+ // we don't know their first positions and mustn't advance past
them.
+ next = watermarkPos != null ? watermarkPos : maxReadPosition;
+ } else {
+ Position min = null;
+ for (TxnEntry e : txns.values()) {
+ if (e.state == TxnState.OPEN && e.firstPosition != null) {
+ if (min == null || e.firstPosition.compareTo(min) < 0) {
+ min = e.firstPosition;
+ }
}
}
+ if (min != null) {
+ next = ledger.getPreviousPosition(min);
+ } else {
+ // No open txns pinning anything: free to advance to
last-dispatched.
+ next = lastDispatchable;
+ }
}
- Position next = (min == null) ? lastDispatchable :
ledger.getPreviousPosition(min);
Position prev = maxReadPosition;
maxReadPosition = next;
- // Only fire the callback on forward motion. Initial-state setup at
recovery may move
- // the position backwards (LAC -> previous(firstOpenWrite)); that's
not a "moved forward".
if (next.compareTo(prev) > 0 && maxReadPositionCallBack != null) {
maxReadPositionCallBack.maxReadPositionMovedForward(prev, next);
}
}
- // ---- SPI surface (lifecycle, queries) ---------------------------------
+ // ---- SPI surface -------------------------------------------------------
@Override
public Position getMaxReadPosition() {
@@ -391,13 +578,11 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
public boolean isTxnAborted(TxnID txnID, Position readPosition) {
String key = TxnIds.toKey(txnID);
synchronized (lock) {
- TxnEntry entry = txns.get(key);
- if (entry == null) {
- // No record of this txn — must be either orphan (broker crash
mid-publish) or
- // long-aborted-and-cleaned. Filtering is the safe default.
- return true;
- }
- return entry.state == TxnState.ABORTED;
+ // New semantics (P3.5): default is committed/visible; only txns
explicitly in the
+ // aborted set are filtered. maxReadPosition caps what the
dispatcher delivers — at the
+ // lowest open txn's first write in steady state, or pinned at the
watermark while
+ // recovery-discovered opens remain — so reads above that cap
don't reach this check.
+ return abortedTxns.contains(key);
}
}
@@ -410,6 +595,13 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
synchronized (lock) {
lastDispatchable = position;
recomputeMaxReadPositionLocked();
+ // Persist the new watermark if it advanced as a result of the
non-txn append.
+ stateTail = stateTail.thenCompose(__ ->
persistWatermarkIfAdvanced())
+ .exceptionally(err -> {
+ log.warn().attr("segment", segmentName).exception(err)
+ .log("Watermark persist on normal publish
failed; will retry");
+ return null;
+ });
}
}
@@ -440,19 +632,14 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
@Override
public CompletableFuture<Void> commitTxn(TxnID txnID, long lowWaterMark) {
- // No-op for the metadata-driven TB: v5 commits are driven by the TC
writing /txn header CAS
- // and the segment-event stream, not by direct SPI calls.
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> abortTxn(TxnID txnID, long lowWaterMark) {
- // No-op (see commitTxn).
return CompletableFuture.completedFuture(null);
}
- // ---- SPI surface (snapshots / readers — unused in v5) -----------------
-
@Override
public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
return CompletableFuture.completedFuture(null);
@@ -526,11 +713,19 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
private static final class TxnEntry {
TxnState state;
- Position firstPosition; // null if no writes on this segment yet, or
after termination
-
- TxnEntry(TxnState state, Position firstPosition) {
+ /** Earliest position the TB itself has seen on this segment for this
txn. */
+ Position firstPosition;
+ /** Latest position the TB itself has seen on this segment for this
txn. */
+ Position lastPosition;
+ /**
+ * True if this entry was created by recovery from a leftover {@code
/txn/op} record —
+ * the TB doesn't know the real positions and pins {@code
maxReadPosition} at the watermark
+ * until this txn resolves.
+ */
+ boolean recoveryDiscovered;
+
+ TxnEntry(TxnState state) {
this.state = state;
- this.firstPosition = firstPosition;
}
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/AbortedTxnRecord.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/AbortedTxnRecord.java
new file mode 100644
index 00000000000..5893275ae70
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/AbortedTxnRecord.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+/**
+ * Per-aborted-txn durable record for a segment. Stored at
+ * {@code /txn/segment-state/aborted/<encoded-segment>:<txnId>} with
+ * {@code partitionKey = segmentKey(segment)} and a {@code
idx:txn-aborted-by-position}
+ * secondary-index entry keyed by {@code <encoded-segment>:<padded-position>}.
+ *
+ * <p>Carries the txn's highest position in this segment. The position-keyed
index lets the TB
+ * range-delete aborted records as the segment ML trims its earliest data —
agnostic to how the
+ * ML organises its underlying storage.
+ *
+ * @param maxLedgerId ledger id of the aborted txn's highest position in this
segment
+ * @param maxEntryId entry id of the aborted txn's highest position in this
segment
+ */
+public record AbortedTxnRecord(long maxLedgerId, long maxEntryId) {
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/SegmentWatermark.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/SegmentWatermark.java
new file mode 100644
index 00000000000..1a9a2be3334
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/SegmentWatermark.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+/**
+ * Per-segment durable watermark record. Stored at
+ * {@code /txn/segment-state/watermark/<encoded-segment>} with
+ * {@code partitionKey = segmentKey(segment)}.
+ *
+ * <p>The position below which every transactional message in the segment is
fully resolved —
+ * either committed-and-visible or in the segment's aborted-txn set. The
dispatcher never reads
+ * above this position. Identical in meaning to today's {@code
maxReadPosition}, but persisted so
+ * the TB can survive a restart and a new {@code EARLIEST} subscription can
still classify old
+ * transactional data after the original {@code /txn/id/<txnId>} headers have
been GC'd.
+ *
+ * @param ledgerId managed-ledger ledger id of the watermark position
+ * @param entryId managed-ledger entry id of the watermark position
+ */
+public record SegmentWatermark(long ledgerId, long entryId) {
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
index 7f95a9be907..323473ac8b1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
@@ -303,6 +303,84 @@ public class TxnMetadataStore {
listener, Set.of(new Option.PartitionKey(pk)));
}
+ // ---- Per-segment durable visibility state -----------------------------
+
+ /**
+ * Read the segment's watermark record, or {@link Optional#empty()} if it
doesn't exist (a
+ * fresh segment that has never had a transactional message).
+ */
+ public CompletableFuture<Optional<Versioned<SegmentWatermark>>>
getSegmentWatermark(String segment) {
+ Set<Option> opts = Set.of(new
Option.PartitionKey(TxnPaths.segmentKey(segment)));
+ return store.get(TxnPaths.segmentWatermarkPath(segment), opts)
+ .thenApply(opt -> opt.map(gr -> new Versioned<>(
+ fromJson(gr.getValue(), SegmentWatermark.class),
gr.getStat().getVersion())));
+ }
+
+ /**
+ * CAS-write the segment's watermark. Pass {@link Optional#empty()} for
unconditional create
+ * (must not exist); pass a version from a prior {@link
#getSegmentWatermark} for an update.
+ */
+ public CompletableFuture<Stat> casSegmentWatermark(String segment,
SegmentWatermark watermark,
+ Optional<Long>
expectedVersion) {
+ Set<Option> opts = Set.of(new
Option.PartitionKey(TxnPaths.segmentKey(segment)));
+ return store.put(TxnPaths.segmentWatermarkPath(segment),
toJson(watermark),
+ expectedVersion, opts);
+ }
+
+ /**
+ * Persist a per-aborted-txn record for {@code (segment, txnId)}. Also
writes the
+ * {@link TxnPaths#IDX_TXN_ABORTED_BY_POSITION} secondary-index entry
keyed by the max
+ * position so the TB can range-delete on ML trim.
+ */
+ public CompletableFuture<Stat> putAbortedTxn(String segment, String txnId,
long maxLedgerId,
+ long maxEntryId) {
+ AbortedTxnRecord record = new AbortedTxnRecord(maxLedgerId,
maxEntryId);
+ Option.SecondaryIndex idx = new
Option.SecondaryIndex(TxnPaths.IDX_TXN_ABORTED_BY_POSITION,
+ TxnPaths.abortedByPositionIndexKey(segment, maxLedgerId,
maxEntryId));
+ Set<Option> opts = Set.of(new
Option.PartitionKey(TxnPaths.segmentKey(segment)), idx);
+ return store.put(TxnPaths.segmentAbortedTxnPath(segment, txnId),
toJson(record),
+ Optional.empty(), opts);
+ }
+
+ /**
+ * Stream all aborted-txn records for {@code segment} whose max position
falls in
+ * {@code [fromKeyInclusive, toKeyInclusive]} (use {@code null} on either
bound for
+ * unbounded). Use {@link TxnPaths#abortedByPositionSegmentLowerBound} /
+ * {@link TxnPaths#abortedByPositionSegmentUpperBound} for the
segment-scoped full range.
+ */
+ public CompletableFuture<Void> scanAbortedTxns(String segment,
+ String fromKeyInclusive,
String toKeyInclusive,
+ ScanConsumer consumer) {
+ String segKey = TxnPaths.segmentKey(segment);
+ return store.scanByIndex(TxnPaths.TXN_SEGMENT_ABORTED_PREFIX,
+ TxnPaths.IDX_TXN_ABORTED_BY_POSITION,
+ fromKeyInclusive, toKeyInclusive,
+ gr -> {
+ // Fallback for stores without native indexes: the records
are flat children
+ // of TXN_SEGMENT_ABORTED_PREFIX named "<segKey>:<txnId>"
— match by segKey.
+ return
segKey.equals(TxnPaths.segmentKeyFromAbortedPath(gr.getStat().getPath()));
+ },
+ consumer);
+ }
+
+ /**
+ * Delete a single aborted-txn record (and its index entry).
+ */
+ public CompletableFuture<Void> deleteAbortedTxn(String segment, String
txnId) {
+ Set<Option> opts = Set.of(new
Option.PartitionKey(TxnPaths.segmentKey(segment)));
+ return store.deleteIfExists(TxnPaths.segmentAbortedTxnPath(segment,
txnId),
+ Optional.empty(), opts);
+ }
+
+ /**
+ * Delete the segment's watermark record — used by the TB at
segment-teardown alongside
+ * deleting any remaining aborted records.
+ */
+ public CompletableFuture<Void> deleteSegmentWatermark(String segment) {
+ Set<Option> opts = Set.of(new
Option.PartitionKey(TxnPaths.segmentKey(segment)));
+ return store.deleteIfExists(TxnPaths.segmentWatermarkPath(segment),
Optional.empty(), opts);
+ }
+
// ---- JSON helpers ------------------------------------------------------
/** @return UTF-8 JSON bytes for {@code value}. Wraps any I/O error as
{@link CompletionException}. */
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
index d9dbbb6b79a..f68d8d73bd1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
@@ -62,6 +62,13 @@ public final class TxnPaths {
*/
public static final String TXN_SUBSCRIPTION_EVENTS_PREFIX =
"/txn/subscription-events";
+ /**
+ * Path prefix for per-segment durable visibility state. Holds the
segment's watermark
+ * record and one aborted-txn record per aborted txn with still-readable
data in the
+ * segment. All records co-locate via {@code partitionKey =
segmentKey(segment)}.
+ */
+ public static final String TXN_SEGMENT_STATE_PREFIX = "/txn/segment-state";
+
/** Index: list write ops by segment. Key = segment. */
public static final String IDX_WRITES_BY_SEGMENT = "idx:writes-by-segment";
@@ -77,6 +84,14 @@ public final class TxnPaths {
*/
public static final String IDX_TXN_BY_FINAL_STATE =
"idx:txn-by-final-state";
+ /**
+ * Index: list per-segment aborted-txn records by max position. Key =
+ * {@code <encoded-segment>:padded(ledgerId):padded(entryId)}. Used by the
TB to scan
+ * its segment's aborted txns into an in-memory set on recovery, and to
range-delete
+ * the records as the segment ML trims past their max position.
+ */
+ public static final String IDX_TXN_ABORTED_BY_POSITION =
"idx:txn-aborted-by-position";
+
/** Width used when formatting long values into
lexicographically-orderable index keys. */
public static final int LONG_KEY_WIDTH = 20;
@@ -86,6 +101,18 @@ public final class TxnPaths {
*/
public static final String MAX_LONG_KEY = "99999999999999999999";
+ /**
+ * The minimum {@link #LONG_KEY_WIDTH}-wide decimal — useful as the lower
bound of a
+ * range scan that starts at position zero.
+ */
+ public static final String MIN_LONG_KEY = "00000000000000000000";
+
+ /** Suffix selecting the lowest {@code (ledgerId, entryId)} position in a
per-segment range. */
+ private static final String MIN_POSITION_SUFFIX = ":" + MIN_LONG_KEY + ":"
+ MIN_LONG_KEY;
+
+ /** Suffix selecting the highest {@code (ledgerId, entryId)} position in a
per-segment range. */
+ private static final String MAX_POSITION_SUFFIX = ":" + MAX_LONG_KEY + ":"
+ MAX_LONG_KEY;
+
/** @return {@code /txn/id/<txnId>} — the header path for {@code txnId}. */
public static String header(String txnId) {
return TXN_HEADER_PREFIX + "/" + txnId;
@@ -134,9 +161,97 @@ public final class TxnPaths {
return segmentKey(segment) + ":" + Codec.encode(subscription);
}
+ /** Parent path for per-segment watermark records. Records are direct
children of this. */
+ public static final String TXN_SEGMENT_WATERMARK_PREFIX =
TXN_SEGMENT_STATE_PREFIX + "/watermark";
+
+ /** Parent path for per-aborted-txn records, flat across all segments. */
+ public static final String TXN_SEGMENT_ABORTED_PREFIX =
TXN_SEGMENT_STATE_PREFIX + "/aborted";
+
+ /**
+ * @return {@code /txn/segment-state/watermark/<encoded-segment>} —
durable watermark record
+ * for {@code segment}. Direct child of {@link
#TXN_SEGMENT_WATERMARK_PREFIX} so the
+ * fallback {@code scanByIndex} path works on backends without a
native secondary index.
+ */
+ public static String segmentWatermarkPath(String segment) {
+ return TXN_SEGMENT_WATERMARK_PREFIX + "/" + segmentKey(segment);
+ }
+
+ /**
+ * @return {@code /txn/segment-state/aborted/<encoded-segment>:<txnId>} —
durable
+ * per-aborted-txn record. Direct child of {@link
#TXN_SEGMENT_ABORTED_PREFIX} (flat
+ * across all segments) so the fallback scan finds it; {@code
<encoded-segment>:<txnId>}
+ * keeps the per-segment grouping addressable.
+ */
+ public static String segmentAbortedTxnPath(String segment, String txnId) {
+ return TXN_SEGMENT_ABORTED_PREFIX + "/" + segmentKey(segment) + ":" +
txnId;
+ }
+
+ /**
+ * @return the {@link #IDX_TXN_ABORTED_BY_POSITION} index key for a
per-segment aborted-txn
+ * record. Format: {@code
<encoded-segment>:padded(ledgerId):padded(entryId)} — the
+ * encoded prefix scopes scans to one segment; the padded position is
lexicographically
+ * ordered so range scans by trim point work naturally.
+ */
+ public static String abortedByPositionIndexKey(String segment, long
ledgerId, long entryId) {
+ return segmentKey(segment) + ":" + longKey(ledgerId) + ":" +
longKey(entryId);
+ }
+
+ /** @return the lower bound of the per-segment range in {@link
#IDX_TXN_ABORTED_BY_POSITION}. */
+ public static String abortedByPositionSegmentLowerBound(String segment) {
+ return segmentKey(segment) + MIN_POSITION_SUFFIX;
+ }
+
+ /** @return the upper bound of the per-segment range in {@link
#IDX_TXN_ABORTED_BY_POSITION}. */
+ public static String abortedByPositionSegmentUpperBound(String segment) {
+ return segmentKey(segment) + MAX_POSITION_SUFFIX;
+ }
+
+ /**
+ * Extract the {@code txnId} part from a path returned by {@link
#segmentAbortedTxnPath}. The
+ * path is {@code .../aborted/<encoded-segment>:<txnId>}; {@code
encoded-segment} is URL-
+ * encoded (no {@code :}) so the first {@code :} in the trailing name is
the segment / txn
+ * separator.
+ *
+ * @return the txnId key, or {@code null} if {@code abortedPath} doesn't
match
+ */
+ public static String txnIdFromAbortedPath(String abortedPath) {
+ int lastSlash = abortedPath.lastIndexOf('/');
+ if (lastSlash < 0) {
+ return null;
+ }
+ String name = abortedPath.substring(lastSlash + 1);
+ int colon = name.indexOf(':');
+ if (colon < 0 || colon == name.length() - 1) {
+ return null;
+ }
+ return name.substring(colon + 1);
+ }
+
+ /**
+ * @return the {@code segmentKey} part from a path returned by {@link
#segmentAbortedTxnPath}.
+ */
+ public static String segmentKeyFromAbortedPath(String abortedPath) {
+ int lastSlash = abortedPath.lastIndexOf('/');
+ if (lastSlash < 0) {
+ return null;
+ }
+ String name = abortedPath.substring(lastSlash + 1);
+ int colon = name.indexOf(':');
+ return colon < 0 ? null : name.substring(0, colon);
+ }
+
/** @return {@code value} formatted as a zero-padded fixed-width decimal
for use as a range-scan index key. */
public static String longKey(long value) {
- return String.format("%0" + LONG_KEY_WIDTH + "d", value);
+ // value is always non-negative here (ledger/entry ids, epoch millis),
and a long never
+ // exceeds LONG_KEY_WIDTH digits — so build the padded string directly
and skip the
+ // String.format overhead on this index-key hot path.
+ char[] buf = new char[LONG_KEY_WIDTH];
+ long v = value;
+ for (int i = LONG_KEY_WIDTH - 1; i >= 0; i--) {
+ buf[i] = (char) ('0' + (int) (v % 10));
+ v /= 10;
+ }
+ return new String(buf);
}
/** @return the composite final-state index key {@code
<state>:padded(finalizedMs)}. */
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
index 44b6e01aaa4..804e12d1bd5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
@@ -29,6 +29,7 @@ import io.netty.buffer.Unpooled;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -36,12 +37,14 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.metadata.AbortedTxnRecord;
import org.apache.pulsar.broker.transaction.metadata.TxnEvent;
import org.apache.pulsar.broker.transaction.metadata.TxnHeader;
import org.apache.pulsar.broker.transaction.metadata.TxnIds;
import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
import org.apache.pulsar.broker.transaction.metadata.TxnOp;
import org.apache.pulsar.broker.transaction.metadata.TxnOpKind;
+import org.apache.pulsar.broker.transaction.metadata.TxnPaths;
import org.apache.pulsar.broker.transaction.metadata.TxnState;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.metadata.api.MetadataStore;
@@ -171,47 +174,214 @@ public class MetadataTransactionBufferTest {
}
@Test
- public void unknownTxn_isTxnAbortedReturnsTrue() throws Exception {
+ public void unknownTxn_isTxnAbortedReturnsFalse() throws Exception {
+ // P3.5 semantics: "below watermark + not in aborted set → visible".
Unknown txns default
+ // visible — orphans are eliminated by the publish-path ordering
reversal (op-record
+ // before ML append), so a txnId we have no record of is either
long-cleaned-up
+ // (committed) or never existed. Either way: visible/committed-default.
MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic,
txnStore);
tb.checkIfTBRecoverCompletely().get();
- // Never seen — must be filtered as aborted (orphan or long-cleaned).
- assertThat(tb.isTxnAborted(new TxnID(99, 99),
PositionFactory.create(1, 0))).isTrue();
+ assertThat(tb.isTxnAborted(new TxnID(99, 99),
PositionFactory.create(1, 0))).isFalse();
}
@Test
- public void recovery_rebuildsOpenTxnStateFromOpRecords() throws Exception {
- // Pre-populate: one OPEN txn with two writes on this segment, one
COMMITTED txn with one
- // (lingering) write, and one txn touching a different segment (must
not appear here).
- TxnID openTxn = new TxnID(1, 1);
- TxnID committedTxn = new TxnID(1, 2);
- TxnID otherSegTxn = new TxnID(1, 3);
+ public void restartAfterCommit_committedDataStillVisible() throws
Exception {
+ // The bug P3.5 fixes: after a TB restart on a topic with old
committed transactional
+ // data, the original /txn/id/<txnId> headers may have been GC'd. With
P3's
+ // "unknown → aborted" default + cleanup-on-apply, the committed
messages would be
+ // wrongly filtered. With P3.5's "unknown → visible (committed) below
watermark", they
+ // are not.
+ //
+ // Simulate: pre-populate the durable per-segment watermark
(representing a TB instance
+ // that previously processed commits and persisted the resolved-below
mark). The header
+ // and /txn/op records are absent — GC ran. Construct a fresh TB and
verify the
+ // committed txn's messages are visible.
+
+ TxnID oldCommittedTxn = new TxnID(7, 42);
+ // Watermark sits at 5:0 — below this is fully resolved per the
previous TB instance.
+ txnStore.casSegmentWatermark(SEGMENT,
+ new
org.apache.pulsar.broker.transaction.metadata.SegmentWatermark(5, 0),
+ Optional.of(-1L)).get();
- createOpenHeader(openTxn);
- txnStore.appendOp(TxnIds.toKey(openTxn),
- new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 1L, null)).get();
- txnStore.appendOp(TxnIds.toKey(openTxn),
- new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 2L, null)).get();
+ MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic,
txnStore);
+ tb.checkIfTBRecoverCompletely().get();
- txnStore.createHeader(TxnIds.toKey(committedTxn),
- new TxnHeader(TxnState.COMMITTED, Duration.ofMillis(5000),
- Instant.ofEpochMilli(1000),
Instant.ofEpochMilli(2000))).get();
- txnStore.appendOp(TxnIds.toKey(committedTxn),
- new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 3L, null)).get();
+ // The bug fix: isTxnAborted no longer treats unknown-below-watermark
as aborted, so
+ // committed messages whose original /txn/id headers have been GC'd
are still visible.
+ assertThat(tb.isTxnAborted(oldCommittedTxn, PositionFactory.create(3,
5))).isFalse();
+ // With no open txns recovered, maxReadPosition can advance past the
watermark to LAC —
+ // the absence of /txn/op records is positive evidence (by the
orphan-elimination
+ // invariant) that no unresolved txn data sits above the watermark.
+
assertThat(tb.getMaxReadPosition()).isEqualTo(ledger.getLastConfirmedEntry());
+ }
+
+ @Test
+ public void restartAfterAbort_abortedTxnStillFiltered() throws Exception {
+ // Mirror of the commit scenario: an aborted txn's durable record
persists in
+ // /txn/segment-state/<segment>/aborted/<txnId> with an index entry on
max position.
+ // Across restarts, isTxnAborted continues to return true even if the
original
+ // /txn/id/<txnId> header has been GC'd.
+
+ TxnID oldAbortedTxn = new TxnID(7, 99);
+ String txnIdKey = TxnIds.toKey(oldAbortedTxn);
+ txnStore.casSegmentWatermark(SEGMENT,
+ new
org.apache.pulsar.broker.transaction.metadata.SegmentWatermark(5, 0),
+ Optional.of(-1L)).get();
+ // Persisted aborted record. Header doesn't exist (GC'd).
+ txnStore.putAbortedTxn(SEGMENT, txnIdKey, 3L, 5L).get();
+
+ MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic,
txnStore);
+ tb.checkIfTBRecoverCompletely().get();
- createOpenHeader(otherSegTxn);
- txnStore.appendOp(TxnIds.toKey(otherSegTxn),
- new TxnOp(TxnOpKind.WRITE,
"segment://public/default/topic/other-seg", null, 5L, 9L, null)).get();
+ assertThat(tb.isTxnAborted(oldAbortedTxn, PositionFactory.create(3,
5))).isTrue();
+ }
+
+ @Test
+ public void recoveryDiscoveredOpenTxn_pinsAtWatermark() throws Exception {
+ // /txn/op records exist for an open txn (broker was processing
publishes for txn T;
+ // T's resolution hadn't yet completed; watermark was persisted at 5:0
before the crash).
+ // On restart the TB pins maxReadPosition at the persisted watermark —
it doesn't know
+ // T's real positions and must not expose any segment data above the
watermark until T
+ // resolves.
+
+ TxnID openTxn = new TxnID(1, 1);
+ createOpenHeader(openTxn);
+ // Position values in the legacy /txn/op format are no longer read by
recovery; only the
+ // existence of the record matters (it tells recovery this txn is
involved on this segment).
+ txnStore.appendOp(TxnIds.toKey(openTxn),
+ new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 0L, 0L, null)).get();
+ // An OPEN txn whose only op record is on a *different* segment must
not be recovered here.
+ TxnID otherSegmentTxn = new TxnID(2, 2);
+ createOpenHeader(otherSegmentTxn);
+ txnStore.appendOp(TxnIds.toKey(otherSegmentTxn),
+ new TxnOp(TxnOpKind.WRITE,
"segment://public/default/topic/0000-ffff-1",
+ null, 0L, 0L, null)).get();
+ txnStore.casSegmentWatermark(SEGMENT,
+ new
org.apache.pulsar.broker.transaction.metadata.SegmentWatermark(5, 0),
+ Optional.of(-1L)).get();
MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic,
txnStore);
tb.checkIfTBRecoverCompletely().get();
- // openTxn pins max read position at min(5:1, 5:2) - 1 = 5:0.
assertThat(tb.getMaxReadPosition()).isEqualTo(PositionFactory.create(5, 0));
+ // Only the same-segment txn is tracked; the other-segment op record
isn't recovered here.
assertThat(tb.getOngoingTxnCount()).isOne();
- assertThat(tb.isTxnAborted(openTxn, PositionFactory.create(5,
1))).isFalse();
- // Committed-txn lingering writes should not pin max read position;
the txn isn't in OPEN set.
- // (Cleanup is async; we don't assert on its completion here.)
+ // An OPEN recovery-discovered txn is not in the aborted set.
+ assertThat(tb.isTxnAborted(openTxn, PositionFactory.create(5,
0))).isFalse();
+ }
+
+ @Test
+ public void
recoveryDiscoveredAbortedTxns_filteredImmediatelyAfterRecovery() throws
Exception {
+ // Regression for the recovery race: txns whose headers are ABORTED
and whose only remaining
+ // trace is a leftover /txn/op record — no durable aborted record was
ever written (the
+ // previous broker crashed before applyTerminalNow persisted it).
isTxnAborted must return
+ // true for every such txn the instant recovery completes. ABORTED
recovery entries aren't
+ // watermark-pinned, so any visibility window here would expose the
aborted data.
+ //
+ // The race only opens with multiple terminal txns: the first
applyTerminalNow runs
+ // synchronously (stateTail was complete) and adds itself, but a
second is queued behind the
+ // first's in-flight persist chain. We force exactly that by hanging
the first putAbortedTxn,
+ // so the second txn's queued apply cannot run before recovery
completes — only the
+ // synchronous hydrate in applyHeaderForRecovery makes it filtered in
time.
+
+ TxnID abortedA = new TxnID(7, 42);
+ TxnID abortedB = new TxnID(7, 43);
+ for (TxnID t : new TxnID[] {abortedA, abortedB}) {
+ createOpenHeader(t);
+ abortTxn(t);
+ // Leftover op record on this segment; no putAbortedTxn — durable
aborted record is gone.
+ txnStore.appendOp(TxnIds.toKey(t),
+ new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 0L, 0L,
null)).get();
+ }
+
+ // Spy the store and hang the first persisted-aborted-record write so
its apply chain stays
+ // in flight, blocking any txn queued behind it on stateTail.
+ TxnMetadataStore spied = org.mockito.Mockito.spy(txnStore);
+ CompletableFuture<Stat> firstPut = new CompletableFuture<>();
+ java.util.concurrent.atomic.AtomicBoolean first = new
java.util.concurrent.atomic.AtomicBoolean(true);
+ doAnswer(inv -> {
+ if (first.getAndSet(false)) {
+ return firstPut;
+ }
+ return inv.callRealMethod();
+ }).when(spied).putAbortedTxn(any(), any(),
org.mockito.ArgumentMatchers.anyLong(),
+ org.mockito.ArgumentMatchers.anyLong());
+
+ MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic,
spied);
+ tb.checkIfTBRecoverCompletely().get();
+
+ // Both must be filtered as of recovery completion — without the
synchronous hydrate, the
+ // txn queued behind the hung persist would still read as visible here.
+ assertThat(tb.isTxnAborted(abortedA, PositionFactory.create(10,
0))).isTrue();
+ assertThat(tb.isTxnAborted(abortedB, PositionFactory.create(10,
0))).isTrue();
+
+ // Release the hung write so the apply chain can drain cleanly.
+ firstPut.complete(null);
+ }
+
+ @Test
+ public void
recoveryDiscoveredAbortedTxn_persistsAbortedRecordAtSegmentLac() throws
Exception {
+ // Regression for the prune-key bug: a recovery-discovered aborted txn
has unknown
+ // positions, so persistAbortedRecord falls back to the segment LAC —
NOT the durable
+ // watermark. The txn's data sits above the watermark, so keying the
record on the
+ // watermark would let trim-pruning drop it while the data is still
readable.
+ TxnID abortedTxn = new TxnID(8, 99);
+ String txnIdKey = TxnIds.toKey(abortedTxn);
+ createOpenHeader(abortedTxn);
+ abortTxn(abortedTxn);
+ txnStore.appendOp(txnIdKey, new TxnOp(TxnOpKind.WRITE, SEGMENT, null,
0L, 0L, null)).get();
+ // Watermark (5,0) is well below the segment LAC (10,0).
+ txnStore.casSegmentWatermark(SEGMENT,
+ new
org.apache.pulsar.broker.transaction.metadata.SegmentWatermark(5, 0),
+ Optional.of(-1L)).get();
+
+ MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic,
txnStore);
+ tb.checkIfTBRecoverCompletely().get();
+
+ // The aborted record is persisted on stateTail after recovery
completes; await it and
+ // assert its stored max position is the LAC (10,0), not the watermark
(5,0).
+ Awaitility.await().untilAsserted(() -> {
+ var opt = store.get(TxnPaths.segmentAbortedTxnPath(SEGMENT,
txnIdKey)).get();
+ assertThat(opt).isPresent();
+ AbortedTxnRecord rec =
TxnMetadataStore.fromJson(opt.get().getValue(), AbortedTxnRecord.class);
+ assertThat(rec.maxLedgerId()).isEqualTo(10L);
+ assertThat(rec.maxEntryId()).isEqualTo(0L);
+ });
+ }
+
+ @Test
+ public void publishOrdering_opRecordWrittenBeforeMlAppend() throws
Exception {
+ // The orphan-elimination invariant: when appendBufferToTxn returns,
both /txn/op and
+ // the ML entry exist. With the reversed ordering, the op record is
written first; if
+ // the ML append then fails, the op record is still present and the
TC's timeout sweep
+ // will abort the txn and clean up. The reverse — entry without op —
cannot happen by
+ // construction.
+
+ TxnID txnId = new TxnID(1, 1);
+ createOpenHeader(txnId);
+ MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic,
txnStore);
+ tb.checkIfTBRecoverCompletely().get();
+
+ tb.appendBufferToTxn(txnId, 0, payload("a")).get();
+
+ // /txn/op record should exist for this txn on this segment.
+ java.util.List<TxnOp> hits = new java.util.ArrayList<>();
+ txnStore.listWritesBySegment(SEGMENT,
+ new org.apache.pulsar.metadata.api.ScanConsumer() {
+ @Override
+ public void
onNext(org.apache.pulsar.metadata.api.GetResult r) {
+ hits.add(TxnMetadataStore.fromJson(r.getValue(),
TxnOp.class));
+ }
+ @Override public void onError(Throwable t) { }
+ @Override public void onCompleted() { }
+ }).get();
+ assertThat(hits).hasSize(1);
+ assertThat(hits.get(0).getKind()).isEqualTo(TxnOpKind.WRITE);
+ // The WRITE op record carries sentinel positions (0/0); positions
live in TB's
+ // in-memory tracking, not in the op record.
+ assertThat(hits.get(0).getLedgerId()).isZero();
+ assertThat(hits.get(0).getEntryId()).isZero();
}
// ---- helpers -----------------------------------------------------------