This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fd0961d8f09 [improve][broker] PIP-473: MetadataTransactionBuffer for 
segment topics (#25768)
fd0961d8f09 is described below

commit fd0961d8f096305cdf03defaa80e6e0c06027bb3
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 13 17:49:50 2026 -0700

    [improve][broker] PIP-473: MetadataTransactionBuffer for segment topics 
(#25768)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |   6 +-
 .../impl/DispatchingTransactionBufferProvider.java |  43 ++
 .../buffer/impl/MetadataTransactionBuffer.java     | 533 +++++++++++++++++++++
 .../impl/MetadataTransactionBufferProvider.java    |  41 ++
 .../pulsar/broker/transaction/metadata/TxnIds.java |  57 +++
 .../transaction/metadata/TxnMetadataStore.java     |  33 ++
 .../broker/transaction/metadata/TxnPaths.java      |  20 +
 .../DispatchingTransactionBufferProviderTest.java  |  78 +++
 .../buffer/impl/MetadataTransactionBufferTest.java | 274 +++++++++++
 .../broker/transaction/metadata/TxnIdsTest.java    |  74 +++
 10 files changed, 1158 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index b22f9a4bef8..4c3651bf90b 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3738,7 +3738,11 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 
     @FieldContext(
             category = CATEGORY_TRANSACTION,
-            doc = "Class name for transaction buffer provider"
+            doc = "Class name for transaction buffer provider. Default routes 
segment:// topics to the"
+                    + " legacy TopicTransactionBuffer. Set this to"
+                    + " 
org.apache.pulsar.broker.transaction.buffer.impl.DispatchingTransactionBufferProvider"
+                    + " once the v5 transaction coordinator (PIP-473 P5) is 
enabled to opt segment topics"
+                    + " into MetadataTransactionBuffer."
     )
     private String transactionBufferProviderClassName =
             
"org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferProvider";
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/DispatchingTransactionBufferProvider.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/DispatchingTransactionBufferProvider.java
new file mode 100644
index 00000000000..b58c38bd929
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/DispatchingTransactionBufferProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
+import org.apache.pulsar.common.naming.TopicName;
+
+/**
+ * Default {@link TransactionBufferProvider}: returns {@link 
MetadataTransactionBuffer} for
+ * {@code segment://} topics (PIP-473) and falls back to {@link 
TopicTransactionBuffer} for
+ * {@code persistent://} / {@code topic://}. This is the configured default so 
segment topics
+ * pick up the metadata-driven implementation out of the box without operators 
flipping a knob.
+ */
+public class DispatchingTransactionBufferProvider implements 
TransactionBufferProvider {
+
+    private final TransactionBufferProvider legacy = new 
TopicTransactionBufferProvider();
+    private final TransactionBufferProvider metadata = new 
MetadataTransactionBufferProvider();
+
+    @Override
+    public TransactionBuffer newTransactionBuffer(Topic originTopic) {
+        return TopicName.get(originTopic.getName()).isSegment()
+                ? metadata.newTransactionBuffer(originTopic)
+                : legacy.newTransactionBuffer(originTopic);
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
new file mode 100644
index 00000000000..42f38a78428
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.LongAdder;
+import lombok.CustomLog;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
+import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
+import org.apache.pulsar.broker.transaction.metadata.TxnHeader;
+import org.apache.pulsar.broker.transaction.metadata.TxnIds;
+import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
+import org.apache.pulsar.broker.transaction.metadata.TxnOp;
+import org.apache.pulsar.broker.transaction.metadata.TxnOpKind;
+import org.apache.pulsar.broker.transaction.metadata.TxnPaths;
+import org.apache.pulsar.broker.transaction.metadata.TxnState;
+import org.apache.pulsar.broker.transaction.metadata.Versioned;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.policies.data.TransactionBufferStats;
+import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.ScanConsumer;
+
+/**
+ * {@link TransactionBuffer} for {@code segment://} topics that reads truth 
from the metadata-store
+ * transaction layout (PIP-473) rather than from a per-topic snapshot log.
+ *
+ * <p>Lifecycle:
+ * <ul>
+ *   <li><b>Publish</b> — {@link #appendBufferToTxn} reads the txn header 
(cache-first), appends the
+ *       entry to the managed ledger, then appends a {@link TxnOp} record under
+ *       {@code /txn-op/<txnId>-<seq>}. Both must succeed before we ack the 
publisher.</li>
+ *   <li><b>State transitions</b> — driven by {@code 
/txn-segment-events/<segment>-*} sequence
+ *       events. The events are wake-ups; the truth is the header. On each 
notification we
+ *       re-read headers for every currently-open txn and apply the resulting 
state changes.</li>
+ *   <li><b>Recovery</b> (Option C) — scan {@code idx:writes-by-segment} for 
this segment, group by
+ *       {@code txnId}, fetch each header, and seed the in-memory cache. Then 
subscribe to the
+ *       event stream for forward updates.</li>
+ * </ul>
+ *
+ * <p><b>TC ordering contract.</b> There is a TOCTOU window between the header 
authorization read
+ * in {@link #appendBufferToTxn} and the managed-ledger append: the TC may 
flip the header (commit
+ * or abort) in between, and the entry still lands. On commit that's harmless 
— the message is
+ * visible. On abort, the subsequent segment-event delivery marks the txn 
ABORTED in the cache and
+ * {@link #isTxnAborted} filters it. This relies on the TC publishing the 
segment event <em>after</em>
+ * the header CAS lands so a participant that lost the race always learns the 
decision. The legacy
+ * {@code TopicTransactionBuffer} has the same window with marker-message 
ordering.
+ *
+ * <p><b>In-memory growth.</b> Terminal txns stay in the {@code txns} cache 
for the segment's
+ * lifetime so {@code isTxnAborted} can answer authoritatively for dispatcher 
reads — evicting a
+ * COMMITTED entry would mean the default "unknown → aborted" filter wrongly 
hides its messages.
+ * Long-running segments with high txn turnover will accumulate cached 
entries. Cache pruning tied
+ * to data-ledger trimming / header GC is a P5/P6 concern.
+ */
+@CustomLog
+public class MetadataTransactionBuffer implements TransactionBuffer {
+
+    private final PersistentTopic topic;
+    private final ManagedLedger ledger;
+    private final TxnMetadataStore txnStore;
+    private final String segmentName;
+    private final TopicTransactionBuffer.MaxReadPositionCallBack 
maxReadPositionCallBack;
+
+    private final CompletableFuture<Void> recoveryFuture = new 
CompletableFuture<>();
+    private volatile AutoCloseable subscription;
+    private volatile boolean closed;
+
+    /** Guards {@link #txns} + {@link #maxReadPosition} + {@link 
#lastDispatchable}. */
+    private final Object lock = new Object();
+
+    /** Cached per-txn state, populated by appendBufferToTxn and refreshed by 
event reconcile. */
+    private final Map<String, TxnEntry> txns = new HashMap<>();
+
+    private Position maxReadPosition;
+    private Position lastDispatchable;
+
+    private final LongAdder committedCount = new LongAdder();
+    private final LongAdder abortedCount = new LongAdder();
+
+    public MetadataTransactionBuffer(PersistentTopic topic, TxnMetadataStore 
txnStore) {
+        this.topic = topic;
+        this.ledger = topic.getManagedLedger();
+        this.txnStore = txnStore;
+        this.segmentName = topic.getName();
+        this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack();
+        this.maxReadPosition = ledger.getLastConfirmedEntry();
+        this.lastDispatchable = this.maxReadPosition;
+        recover();
+    }
+
+    // ---- Recovery ----------------------------------------------------------
+
+    private void recover() {
+        AutoCloseable handle;
+        try {
+            handle = txnStore.subscribeSegmentEvents(segmentName, path -> 
triggerReconcile());
+        } catch (MetadataStoreException e) {
+            recoveryFuture.completeExceptionally(e);
+            return;
+        }
+        subscription = handle;
+
+        // Scan all /txn-op records for this segment, group by txnId.
+        Map<String, List<Position>> opsByTxn = new ConcurrentHashMap<>();
+        txnStore.listWritesBySegment(segmentName, new ScanConsumer() {
+            @Override
+            public void onNext(GetResult r) {
+                TxnOp op = TxnMetadataStore.fromJson(r.getValue(), 
TxnOp.class);
+                String txnIdKey = 
TxnPaths.txnIdFromOpPath(r.getStat().getPath());
+                if (txnIdKey == null) {
+                    return;
+                }
+                opsByTxn.computeIfAbsent(txnIdKey, k -> new ArrayList<>())
+                        .add(PositionFactory.create(op.getLedgerId(), 
op.getEntryId()));
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                // Future propagation handled by listWritesBySegment's 
returned future.
+            }
+
+            @Override
+            public void onCompleted() {
+            }
+        })
+        .thenCompose(__ -> {
+            // Fan out one header read per distinct txnId; build initial state.
+            List<CompletableFuture<Void>> reads = new ArrayList<>();
+            opsByTxn.forEach((txnIdKey, positions) -> reads.add(
+                    txnStore.getHeader(txnIdKey).thenAccept(opt -> 
applyHeaderForRecovery(
+                            txnIdKey, opt, positions))));
+            return FutureUtil.waitForAll(reads);
+        })
+        .whenComplete((__, err) -> {
+            if (err != null) {
+                log.error().attr("segment", 
segmentName).exception(err).log("TB recovery failed");
+                // Close the subscription we opened above so the listener 
doesn't outlive a
+                // failed-to-recover TB instance (closeAsync may never be 
called if recovery never
+                // succeeded).
+                closeSubscriptionQuietly();
+                recoveryFuture.completeExceptionally(err);
+                return;
+            }
+            synchronized (lock) {
+                recomputeMaxReadPositionLocked();
+            }
+            recoveryFuture.complete(null);
+            // Drain any events that fired between subscribe and now — 
triggerReconcile short-
+            // circuits while recoveryFuture is not done, so we explicitly 
kick a reconcile pass
+            // now to pick up state transitions whose only notification landed 
in that window.
+            triggerReconcile();
+        });
+    }
+
+    private void applyHeaderForRecovery(String txnIdKey, 
Optional<Versioned<TxnHeader>> opt, List<Position> positions) {
+        TxnState state = opt.map(v -> 
v.value().getState()).orElse(TxnState.ABORTED);
+        Position first = 
positions.stream().min(Position::compareTo).orElse(null);
+        synchronized (lock) {
+            txns.put(txnIdKey, new TxnEntry(state, first));
+        }
+        // Schedule op-record cleanup for terminal txns (best-effort, async).
+        if (state.isTerminal()) {
+            cleanupOpRecords(txnIdKey);
+        }
+    }
+
+    // ---- Publish path ------------------------------------------------------
+
+    @Override
+    public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long 
sequenceId, ByteBuf buffer) {
+        // Retain so the buffer survives the chain — release in the terminal 
handlers.
+        buffer.retain();
+        return recoveryFuture.thenCompose(__ -> internalAppend(txnId, buffer))
+                .whenComplete((p, ex) -> buffer.release());
+    }
+
+    private CompletableFuture<Position> internalAppend(TxnID txnId, ByteBuf 
buffer) {
+        if (closed) {
+            return FutureUtil.failedFuture(new 
BrokerServiceException.ServiceUnitNotReadyException(
+                    "Transaction buffer is closed"));
+        }
+        String txnIdKey = TxnIds.toKey(txnId);
+        return readStateCacheFirst(txnIdKey)
+                .thenCompose(state -> {
+                    if (state.isTerminal()) {
+                        return FutureUtil.failedFuture(new 
BrokerServiceException.NotAllowedException(
+                                "Transaction " + txnId + " is already " + 
state + " — TxnConflict"));
+                    }
+                    return appendToLedger(buffer)
+                            .thenCompose(position -> recordOp(txnId, txnIdKey, 
position));
+                });
+    }
+
+    private CompletableFuture<TxnState> readStateCacheFirst(String txnIdKey) {
+        synchronized (lock) {
+            TxnEntry cached = txns.get(txnIdKey);
+            if (cached != null) {
+                return CompletableFuture.completedFuture(cached.state);
+            }
+        }
+        return txnStore.getHeader(txnIdKey).thenApply(opt -> {
+            TxnState state = opt.map(v -> 
v.value().getState()).orElse(TxnState.ABORTED);
+            synchronized (lock) {
+                txns.putIfAbsent(txnIdKey, new TxnEntry(state, null));
+            }
+            return state;
+        });
+    }
+
+    private CompletableFuture<Position> appendToLedger(ByteBuf buffer) {
+        CompletableFuture<Position> result = new CompletableFuture<>();
+        ledger.asyncAddEntry(buffer, new AsyncCallbacks.AddEntryCallback() {
+            @Override
+            public void addComplete(Position position, ByteBuf entryData, 
Object ctx) {
+                result.complete(position);
+            }
+
+            @Override
+            public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                result.completeExceptionally(exception);
+            }
+        }, null);
+        return result;
+    }
+
+    private CompletableFuture<Position> recordOp(TxnID txnId, String txnIdKey, 
Position position) {
+        TxnOp op = new TxnOp(TxnOpKind.WRITE, segmentName, null,
+                position.getLedgerId(), position.getEntryId());
+        return txnStore.appendOp(txnIdKey, op).thenApply(stat -> {
+            synchronized (lock) {
+                TxnEntry entry = txns.get(txnIdKey);
+                // Only an OPEN entry pins maxReadPosition. If a concurrent 
reconcile flipped this
+                // txn to terminal between the cache-first authorization read 
and this update, do
+                // NOT resurrect it as OPEN — the ML append still lands but 
isTxnAborted will mask
+                // it for aborted txns (see class javadoc on the publish-side 
TOCTOU window).
+                if (entry != null && entry.state == TxnState.OPEN) {
+                    if (entry.firstPosition == null || 
position.compareTo(entry.firstPosition) < 0) {
+                        entry.firstPosition = position;
+                        recomputeMaxReadPositionLocked();
+                    }
+                }
+            }
+            return position;
+        });
+    }
+
+    // ---- Reconcile (event-driven) -----------------------------------------
+
+    private void triggerReconcile() {
+        if (closed || !recoveryFuture.isDone()) {
+            return;
+        }
+        Set<String> snapshot;
+        synchronized (lock) {
+            snapshot = new HashSet<>(openTxnsLocked());
+        }
+        if (snapshot.isEmpty()) {
+            return;
+        }
+        List<CompletableFuture<Void>> reads = new ArrayList<>(snapshot.size());
+        for (String txnIdKey : snapshot) {
+            reads.add(txnStore.getHeader(txnIdKey).thenAccept(opt -> {
+                TxnState newState = opt.map(v -> 
v.value().getState()).orElse(TxnState.ABORTED);
+                applyReconciledState(txnIdKey, newState);
+            }));
+        }
+        FutureUtil.waitForAll(reads).whenComplete((__, err) -> {
+            if (err != null) {
+                log.warn().attr("segment", 
segmentName).exception(err).log("Reconcile encountered error");
+            }
+        });
+    }
+
+    private void applyReconciledState(String txnIdKey, TxnState newState) {
+        boolean cleanup = false;
+        synchronized (lock) {
+            TxnEntry entry = txns.get(txnIdKey);
+            if (entry == null || entry.state == newState) {
+                return;
+            }
+            entry.state = newState;
+            if (newState.isTerminal()) {
+                entry.firstPosition = null;
+                cleanup = true;
+                if (newState == TxnState.COMMITTED) {
+                    committedCount.increment();
+                } else {
+                    abortedCount.increment();
+                }
+                recomputeMaxReadPositionLocked();
+            }
+        }
+        if (cleanup) {
+            cleanupOpRecords(txnIdKey);
+        }
+    }
+
+    /**
+     * Delete every {@code /txn-op} record for {@code (this segment, 
txnIdKey)}. Best-effort —
+     * failures are logged and retried by the next reconcile.
+     */
+    private void cleanupOpRecords(String txnIdKey) {
+        txnStore.deleteWriteOpsForSegmentAndTxn(segmentName, txnIdKey)
+                .exceptionally(err -> {
+                    log.warn().attr("segment", segmentName).attr("txnId", 
txnIdKey).exception(err)
+                            .log("Op-record cleanup failed; will retry on next 
reconcile");
+                    return null;
+                });
+    }
+
+    // ---- maxReadPosition ---------------------------------------------------
+
+    private Set<String> openTxnsLocked() {
+        Set<String> open = new HashSet<>();
+        for (Map.Entry<String, TxnEntry> e : txns.entrySet()) {
+            if (e.getValue().state == TxnState.OPEN) {
+                open.add(e.getKey());
+            }
+        }
+        return open;
+    }
+
+    private void recomputeMaxReadPositionLocked() {
+        Position min = null;
+        for (TxnEntry e : txns.values()) {
+            if (e.state == TxnState.OPEN && e.firstPosition != null) {
+                if (min == null || e.firstPosition.compareTo(min) < 0) {
+                    min = e.firstPosition;
+                }
+            }
+        }
+        Position next = (min == null) ? lastDispatchable : 
ledger.getPreviousPosition(min);
+        Position prev = maxReadPosition;
+        maxReadPosition = next;
+        // Only fire the callback on forward motion. Initial-state setup at 
recovery may move
+        // the position backwards (LAC -> previous(firstOpenWrite)); that's 
not a "moved forward".
+        if (next.compareTo(prev) > 0 && maxReadPositionCallBack != null) {
+            maxReadPositionCallBack.maxReadPositionMovedForward(prev, next);
+        }
+    }
+
+    // ---- SPI surface (lifecycle, queries) ---------------------------------
+
+    @Override
+    public Position getMaxReadPosition() {
+        synchronized (lock) {
+            return maxReadPosition;
+        }
+    }
+
+    @Override
+    public boolean isTxnAborted(TxnID txnID, Position readPosition) {
+        String key = TxnIds.toKey(txnID);
+        synchronized (lock) {
+            TxnEntry entry = txns.get(key);
+            if (entry == null) {
+                // No record of this txn — must be either orphan (broker crash 
mid-publish) or
+                // long-aborted-and-cleaned. Filtering is the safe default.
+                return true;
+            }
+            return entry.state == TxnState.ABORTED;
+        }
+    }
+
+    @Override
+    public void syncMaxReadPositionForNormalPublish(Position position, boolean 
isMarkerMessage) {
+        if (isMarkerMessage) {
+            return;
+        }
+        topic.updateLastDispatchablePosition(position);
+        synchronized (lock) {
+            lastDispatchable = position;
+            recomputeMaxReadPositionLocked();
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> checkIfTBRecoverCompletely() {
+        return recoveryFuture;
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        closed = true;
+        closeSubscriptionQuietly();
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private void closeSubscriptionQuietly() {
+        AutoCloseable handle = subscription;
+        if (handle == null) {
+            return;
+        }
+        subscription = null;
+        try {
+            handle.close();
+        } catch (Throwable t) {
+            log.warn().attr("segment", segmentName).exception(t).log("Failed 
to close subscription");
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> commitTxn(TxnID txnID, long lowWaterMark) {
+        // No-op for the metadata-driven TB: v5 commits are driven by the TC 
writing /txn header CAS
+        // and the segment-event stream, not by direct SPI calls.
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> abortTxn(TxnID txnID, long lowWaterMark) {
+        // No-op (see commitTxn).
+        return CompletableFuture.completedFuture(null);
+    }
+
+    // ---- SPI surface (snapshots / readers — unused in v5) -----------------
+
+    @Override
+    public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<TransactionBufferReader> 
openTransactionBufferReader(TxnID txnID, long startSequenceId) {
+        return FutureUtil.failedFuture(new 
BrokerServiceException.NotAllowedException(
+                "openTransactionBufferReader is not supported on segment 
topics"));
+    }
+
+    @Override
+    public CompletableFuture<Void> purgeTxns(List<Long> dataLedgers) {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> clearSnapshot() {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> clearSnapshotAndClose() {
+        return closeAsync();
+    }
+
+    @Override
+    public AbortedTxnProcessor.SnapshotType getSnapshotType() {
+        return null;
+    }
+
+    @Override
+    public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
+        return null;
+    }
+
+    @Override
+    public TransactionBufferStats getStats(boolean lowWaterMarks, boolean 
segmentStats) {
+        return null;
+    }
+
+    @Override
+    public TransactionBufferStats getStats(boolean lowWaterMarks) {
+        return null;
+    }
+
+    @Override
+    public long getOngoingTxnCount() {
+        synchronized (lock) {
+            long n = 0;
+            for (TxnEntry e : txns.values()) {
+                if (e.state == TxnState.OPEN) {
+                    n++;
+                }
+            }
+            return n;
+        }
+    }
+
+    @Override
+    public long getAbortedTxnCount() {
+        return abortedCount.sum();
+    }
+
+    @Override
+    public long getCommittedTxnCount() {
+        return committedCount.sum();
+    }
+
+    // ---- helpers ----------------------------------------------------------
+
+    private static final class TxnEntry {
+        TxnState state;
+        Position firstPosition; // null if no writes on this segment yet, or 
after termination
+
+        TxnEntry(TxnState state, Position firstPosition) {
+            this.state = state;
+            this.firstPosition = firstPosition;
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferProvider.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferProvider.java
new file mode 100644
index 00000000000..036098bebe4
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferProvider.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
+import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
+
+/**
+ * Provider that builds a {@link MetadataTransactionBuffer} backed by the 
broker's local
+ * {@code MetadataStore}. Intended for {@code segment://} topics. The 
dispatching provider routes
+ * here when the topic is a segment topic.
+ */
+public class MetadataTransactionBufferProvider implements 
TransactionBufferProvider {
+
+    @Override
+    public TransactionBuffer newTransactionBuffer(Topic originTopic) {
+        PersistentTopic topic = (PersistentTopic) originTopic;
+        TxnMetadataStore txnStore = new TxnMetadataStore(
+                topic.getBrokerService().getPulsar().getLocalMetadataStore());
+        return new MetadataTransactionBuffer(topic, txnStore);
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnIds.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnIds.java
new file mode 100644
index 00000000000..b3b7c961e9e
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnIds.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+/**
+ * Round-trip between {@link TxnID} and the string form used in metadata-store 
paths and
+ * partition keys (e.g. {@code /txn/<txnId>}, {@code partitionKey = txnId}).
+ *
+ * <p>Format: {@code <mostSigBits>_<leastSigBits>}. Path-friendly (no 
parens/commas) and round-trips
+ * losslessly for any pair of {@code long} values — including negatives — 
because {@code _} cannot
+ * appear inside a decimal long literal (Java's {@link Long#parseLong} rejects 
it). {@link TxnID#toString}
+ * uses {@code (most,least)} which leaks shell-unfriendly characters into 
paths; this helper is the
+ * single point that controls the on-the-wire encoding.
+ */
+public final class TxnIds {
+
+    private static final char SEP = '_';
+
+    /** @return {@code <most>_<least>}, suitable for use as a metadata-store 
path segment. */
+    public static String toKey(TxnID txnId) {
+        return txnId.getMostSigBits() + String.valueOf(SEP) + 
txnId.getLeastSigBits();
+    }
+
+    /**
+     * @return the {@link TxnID} parsed from {@code key}.
+     * @throws IllegalArgumentException if {@code key} is not in the expected 
{@code <most>_<least>} form
+     */
+    public static TxnID fromKey(String key) {
+        int sep = key.indexOf(SEP);
+        if (sep <= 0 || sep == key.length() - 1 || key.indexOf(SEP, sep + 1) 
>= 0) {
+            throw new IllegalArgumentException("Invalid txnId key: " + key);
+        }
+        long most = Long.parseLong(key, 0, sep, 10);
+        long least = Long.parseLong(key, sep + 1, key.length(), 10);
+        return new TxnID(most, least);
+    }
+
+    private TxnIds() {}
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
index 5c2087e3fba..8e20ed305f8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
@@ -155,6 +155,39 @@ public class TxnMetadataStore {
                 consumer);
     }
 
+    /**
+     * Delete every {@code /txn-op} write record for {@code (segment, txnId)} 
— used by the TB once
+     * an event tells it the txn is terminal. Path extraction follows the 
layout in
+     * {@link TxnPaths#txnIdFromOpPath}. Best-effort: tolerates concurrent 
deletions via
+     * {@link MetadataStore#deleteIfExists}.
+     */
+    public CompletableFuture<Void> deleteWriteOpsForSegmentAndTxn(String 
segment, String txnId) {
+        java.util.List<String> paths = new java.util.ArrayList<>();
+        return listWritesBySegment(segment, new ScanConsumer() {
+            @Override
+            public void onNext(org.apache.pulsar.metadata.api.GetResult r) {
+                if 
(txnId.equals(TxnPaths.txnIdFromOpPath(r.getStat().getPath()))) {
+                    paths.add(r.getStat().getPath());
+                }
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+            }
+
+            @Override
+            public void onCompleted() {
+            }
+        }).thenCompose(__ -> {
+            Set<Option> opts = Set.of(new Option.PartitionKey(txnId));
+            CompletableFuture<?>[] deletes = new 
CompletableFuture<?>[paths.size()];
+            for (int i = 0; i < paths.size(); i++) {
+                deletes[i] = store.deleteIfExists(paths.get(i), 
Optional.empty(), opts);
+            }
+            return CompletableFuture.allOf(deletes);
+        });
+    }
+
     /**
      * Stream open transactions whose deadline falls in {@code 
[fromMsInclusive, toMsInclusive]}.
      * Pass {@code null} on either bound for an unbounded range.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
index c8952af1048..d90c7a470cf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
@@ -140,5 +140,25 @@ public final class TxnPaths {
         return state.name() + ":" + longKey(finalizedMs);
     }
 
+    /**
+     * Extract the {@code txnId} key from a path under {@link #TXN_OP_PREFIX}. 
The path layout is
+     * {@code /txn-op/<txnId>-<paddedSeq>}; txnId itself is {@code 
<most>-<least>} (one dash), so
+     * the sequence dash is always the last one and the substring before it is 
the txnId key.
+     *
+     * @return the txnId key, or {@code null} if {@code opPath} doesn't have 
the expected shape
+     */
+    public static String txnIdFromOpPath(String opPath) {
+        int lastSlash = opPath.lastIndexOf('/');
+        if (lastSlash < 0) {
+            return null;
+        }
+        String name = opPath.substring(lastSlash + 1);
+        int dash = name.lastIndexOf('-');
+        if (dash <= 0) {
+            return null;
+        }
+        return name.substring(0, dash);
+    }
+
     private TxnPaths() {}
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/DispatchingTransactionBufferProviderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/DispatchingTransactionBufferProviderTest.java
new file mode 100644
index 00000000000..fccba498973
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/DispatchingTransactionBufferProviderTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import io.netty.buffer.ByteBuf;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.testng.annotations.Test;
+
+/**
+ * Routing tests for the default {@link DispatchingTransactionBufferProvider}: 
{@code segment://}
+ * topics get {@link MetadataTransactionBuffer}, everything else falls through 
to the legacy
+ * {@link TopicTransactionBuffer}.
+ */
+public class DispatchingTransactionBufferProviderTest {
+
+    @Test
+    public void routesSegmentTopicToMetadataBuffer() throws Exception {
+        try (MetadataStoreExtended store = 
MetadataStoreExtended.create("memory:dispatcher-test",
+                MetadataStoreConfig.builder().fsyncEnable(false).build())) {
+            PersistentTopic topic = 
mockTopic("segment://public/default/topic/0000-ffff-0", store);
+            TransactionBuffer tb = new 
DispatchingTransactionBufferProvider().newTransactionBuffer(topic);
+            assertThat(tb).isInstanceOf(MetadataTransactionBuffer.class);
+            tb.closeAsync().get();
+        }
+    }
+
+    @Test
+    public void routesPersistentTopicToLegacyBuffer() {
+        PersistentTopic topic = mockTopic("persistent://public/default/topic", 
null);
+        TransactionBuffer tb = new 
DispatchingTransactionBufferProvider().newTransactionBuffer(topic);
+        assertThat(tb).isInstanceOf(TopicTransactionBuffer.class);
+    }
+
+    private static PersistentTopic mockTopic(String name, 
MetadataStoreExtended store) {
+        PersistentTopic t = mock(PersistentTopic.class, RETURNS_DEEP_STUBS);
+        when(t.getName()).thenReturn(name);
+        ManagedLedger ml = mock(ManagedLedger.class);
+        when(ml.getLastConfirmedEntry()).thenReturn(PositionFactory.create(0, 
0));
+        doAnswer(inv -> {
+            AsyncCallbacks.AddEntryCallback cb = inv.getArgument(1);
+            cb.addComplete(PositionFactory.create(0, 0), inv.getArgument(0), 
inv.getArgument(2));
+            return null;
+        }).when(ml).asyncAddEntry(any(ByteBuf.class), any(), any());
+        when(t.getManagedLedger()).thenReturn(ml);
+        if (store != null) {
+            
when(t.getBrokerService().getPulsar().getLocalMetadataStore()).thenReturn(store);
+        }
+        return t;
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
new file mode 100644
index 00000000000..5e4dfe736b7
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.metadata.TxnEvent;
+import org.apache.pulsar.broker.transaction.metadata.TxnHeader;
+import org.apache.pulsar.broker.transaction.metadata.TxnIds;
+import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
+import org.apache.pulsar.broker.transaction.metadata.TxnOp;
+import org.apache.pulsar.broker.transaction.metadata.TxnOpKind;
+import org.apache.pulsar.broker.transaction.metadata.TxnState;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.Stat;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for {@link MetadataTransactionBuffer} against the in-memory 
{@code MetadataStore}.
+ * The TB's collaborators ({@link PersistentTopic}, {@link ManagedLedger}) are 
mocked so the test
+ * focuses on the buffer's state machine and metadata interactions.
+ */
+public class MetadataTransactionBufferTest {
+
+    private static final String SEGMENT = 
"segment://public/default/topic/0000-ffff-0";
+
+    private MetadataStore store;
+    private TxnMetadataStore txnStore;
+    private ManagedLedger ledger;
+    private PersistentTopic topic;
+    private AtomicLong nextEntryId;
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        store = MetadataStoreFactory.create("memory:local",
+                MetadataStoreConfig.builder().fsyncEnable(false).build());
+        txnStore = new TxnMetadataStore(store);
+        nextEntryId = new AtomicLong(1);
+        ledger = mockManagedLedger();
+        topic = mockTopic(SEGMENT, ledger);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws Exception {
+        if (store != null) {
+            store.close();
+        }
+    }
+
+    @Test
+    public void openTxnAppend_pinsMaxReadPosition() throws Exception {
+        TxnID txnId = new TxnID(1, 1);
+        createOpenHeader(txnId);
+        MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic, 
txnStore);
+        tb.checkIfTBRecoverCompletely().get();
+
+        Position firstPos = tb.appendBufferToTxn(txnId, 0, payload("a")).get();
+
+        // maxReadPosition should be just before the first txn entry.
+        
assertThat(tb.getMaxReadPosition()).isEqualTo(ledger.getPreviousPosition(firstPos));
+        assertThat(tb.isTxnAborted(txnId, firstPos)).isFalse();
+        assertThat(tb.getOngoingTxnCount()).isOne();
+    }
+
+    @Test
+    public void concurrentOpenTxns_minPositionPins() throws Exception {
+        TxnID t1 = new TxnID(1, 1);
+        TxnID t2 = new TxnID(1, 2);
+        createOpenHeader(t1);
+        createOpenHeader(t2);
+        MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic, 
txnStore);
+        tb.checkIfTBRecoverCompletely().get();
+
+        Position p1 = tb.appendBufferToTxn(t1, 0, payload("a")).get();
+        tb.appendBufferToTxn(t2, 0, payload("b")).get();
+
+        
assertThat(tb.getMaxReadPosition()).isEqualTo(ledger.getPreviousPosition(p1));
+    }
+
+    @Test
+    public void commitEvent_dropsTxn_advancesMaxReadPosition() throws 
Exception {
+        TxnID txnId = new TxnID(1, 1);
+        createOpenHeader(txnId);
+        MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic, 
txnStore);
+        tb.checkIfTBRecoverCompletely().get();
+        tb.appendBufferToTxn(txnId, 0, payload("a")).get();
+
+        // TC-side: flip header to COMMITTED + publish segment event.
+        commitTxn(txnId);
+        txnStore.publishSegmentEvent(SEGMENT, new 
TxnEvent(TxnIds.toKey(txnId), TxnState.COMMITTED)).get();
+
+        Awaitility.await().untilAsserted(() -> {
+            assertThat(tb.getOngoingTxnCount()).isZero();
+            assertThat(tb.getCommittedTxnCount()).isOne();
+        });
+
+        // No OPEN txns pin the read position — should now sit at LAC.
+        
assertThat(tb.getMaxReadPosition()).isEqualTo(ledger.getLastConfirmedEntry());
+    }
+
+    @Test
+    public void abortEvent_marksAborted_isTxnAbortedTrue() throws Exception {
+        TxnID txnId = new TxnID(1, 1);
+        createOpenHeader(txnId);
+        MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic, 
txnStore);
+        tb.checkIfTBRecoverCompletely().get();
+        Position firstPos = tb.appendBufferToTxn(txnId, 0, payload("a")).get();
+
+        abortTxn(txnId);
+        txnStore.publishSegmentEvent(SEGMENT, new 
TxnEvent(TxnIds.toKey(txnId), TxnState.ABORTED)).get();
+
+        Awaitility.await().untilAsserted(() -> {
+            assertThat(tb.isTxnAborted(txnId, firstPos)).isTrue();
+            assertThat(tb.getAbortedTxnCount()).isOne();
+            assertThat(tb.getOngoingTxnCount()).isZero();
+        });
+    }
+
+    @Test
+    public void appendToCommittedTxn_failsTxnConflict() throws Exception {
+        TxnID txnId = new TxnID(1, 1);
+        // Pre-set header to COMMITTED — txn is terminal before any append.
+        Stat created = txnStore.createHeader(TxnIds.toKey(txnId),
+                new TxnHeader(TxnState.COMMITTED, Duration.ofMillis(5000),
+                        Instant.ofEpochMilli(1000), 
Instant.ofEpochMilli(2000))).get();
+        assertThat(created.getVersion()).isZero();
+
+        MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic, 
txnStore);
+        tb.checkIfTBRecoverCompletely().get();
+
+        assertThatThrownBy(() -> tb.appendBufferToTxn(txnId, 0, 
payload("a")).get())
+                
.hasCauseInstanceOf(BrokerServiceException.NotAllowedException.class);
+    }
+
+    @Test
+    public void unknownTxn_isTxnAbortedReturnsTrue() throws Exception {
+        MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic, 
txnStore);
+        tb.checkIfTBRecoverCompletely().get();
+
+        // Never seen — must be filtered as aborted (orphan or long-cleaned).
+        assertThat(tb.isTxnAborted(new TxnID(99, 99), 
PositionFactory.create(1, 0))).isTrue();
+    }
+
+    @Test
+    public void recovery_rebuildsOpenTxnStateFromOpRecords() throws Exception {
+        // Pre-populate: one OPEN txn with two writes on this segment, one 
COMMITTED txn with one
+        // (lingering) write, and one txn touching a different segment (must 
not appear here).
+        TxnID openTxn = new TxnID(1, 1);
+        TxnID committedTxn = new TxnID(1, 2);
+        TxnID otherSegTxn = new TxnID(1, 3);
+
+        createOpenHeader(openTxn);
+        txnStore.appendOp(TxnIds.toKey(openTxn),
+                new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 1L)).get();
+        txnStore.appendOp(TxnIds.toKey(openTxn),
+                new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 2L)).get();
+
+        txnStore.createHeader(TxnIds.toKey(committedTxn),
+                new TxnHeader(TxnState.COMMITTED, Duration.ofMillis(5000),
+                        Instant.ofEpochMilli(1000), 
Instant.ofEpochMilli(2000))).get();
+        txnStore.appendOp(TxnIds.toKey(committedTxn),
+                new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 3L)).get();
+
+        createOpenHeader(otherSegTxn);
+        txnStore.appendOp(TxnIds.toKey(otherSegTxn),
+                new TxnOp(TxnOpKind.WRITE, 
"segment://public/default/topic/other-seg", null, 5L, 9L)).get();
+
+        MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic, 
txnStore);
+        tb.checkIfTBRecoverCompletely().get();
+
+        // openTxn pins max read position at min(5:1, 5:2) - 1 = 5:0.
+        
assertThat(tb.getMaxReadPosition()).isEqualTo(PositionFactory.create(5, 0));
+        assertThat(tb.getOngoingTxnCount()).isOne();
+        assertThat(tb.isTxnAborted(openTxn, PositionFactory.create(5, 
1))).isFalse();
+        // Committed-txn lingering writes should not pin max read position; 
the txn isn't in OPEN set.
+        // (Cleanup is async; we don't assert on its completion here.)
+    }
+
+    // ---- helpers -----------------------------------------------------------
+
+    private static ByteBuf payload(String s) {
+        return Unpooled.copiedBuffer(s.getBytes());
+    }
+
+    private void createOpenHeader(TxnID txnId) throws Exception {
+        txnStore.createHeader(TxnIds.toKey(txnId),
+                new TxnHeader(TxnState.OPEN, Duration.ofMillis(60_000),
+                        Instant.ofEpochMilli(1000), null)).get();
+    }
+
+    private void commitTxn(TxnID txnId) throws Exception {
+        var v = txnStore.getHeader(TxnIds.toKey(txnId)).get().orElseThrow();
+        var h = v.value();
+        txnStore.updateHeader(TxnIds.toKey(txnId),
+                new TxnHeader(TxnState.COMMITTED, h.getTimeout(), 
h.getCreatedAt(), Instant.now()),
+                v.version()).get();
+    }
+
+    private void abortTxn(TxnID txnId) throws Exception {
+        var v = txnStore.getHeader(TxnIds.toKey(txnId)).get().orElseThrow();
+        var h = v.value();
+        txnStore.updateHeader(TxnIds.toKey(txnId),
+                new TxnHeader(TxnState.ABORTED, h.getTimeout(), 
h.getCreatedAt(), Instant.now()),
+                v.version()).get();
+    }
+
+    private ManagedLedger mockManagedLedger() {
+        ManagedLedger ml = mock(ManagedLedger.class);
+        when(ml.getLastConfirmedEntry()).thenReturn(PositionFactory.create(10, 
0));
+        // asyncAddEntry: synthesize a unique increasing position, invoke 
addComplete.
+        doAnswer(inv -> {
+            AsyncCallbacks.AddEntryCallback cb = inv.getArgument(1);
+            Position p = PositionFactory.create(10, 
nextEntryId.getAndIncrement());
+            cb.addComplete(p, inv.getArgument(0), inv.getArgument(2));
+            return null;
+        }).when(ml).asyncAddEntry(any(ByteBuf.class), any(), any());
+        when(ml.getPreviousPosition(any())).thenAnswer(inv -> {
+            Position p = inv.getArgument(0);
+            return PositionFactory.create(p.getLedgerId(), p.getEntryId() - 1);
+        });
+        return ml;
+    }
+
+    private PersistentTopic mockTopic(String name, ManagedLedger ml) {
+        PersistentTopic t = mock(PersistentTopic.class);
+        when(t.getName()).thenReturn(name);
+        when(t.getManagedLedger()).thenReturn(ml);
+        when(t.getMaxReadPositionCallBack()).thenReturn(null);
+        return t;
+    }
+
+    @SuppressWarnings("unused") // referenced from Optional to suppress 
unused-imports
+    private static <T> Optional<T> emptyOpt() {
+        return Optional.empty();
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnIdsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnIdsTest.java
new file mode 100644
index 00000000000..20dd77ea0a3
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnIdsTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.testng.annotations.Test;
+
+public class TxnIdsTest {
+
+    @Test
+    public void roundTripPositive() {
+        TxnID id = new TxnID(1, 2);
+        assertThat(TxnIds.toKey(id)).isEqualTo("1_2");
+        assertThat(TxnIds.fromKey("1_2")).isEqualTo(id);
+    }
+
+    @Test
+    public void roundTripNegativeMostSigBits() {
+        TxnID id = new TxnID(-1, 1);
+        assertThat(TxnIds.toKey(id)).isEqualTo("-1_1");
+        assertThat(TxnIds.fromKey("-1_1")).isEqualTo(id);
+    }
+
+    @Test
+    public void roundTripBothNegative() {
+        TxnID id = new TxnID(-7, -42);
+        assertThat(TxnIds.toKey(id)).isEqualTo("-7_-42");
+        assertThat(TxnIds.fromKey("-7_-42")).isEqualTo(id);
+    }
+
+    @Test
+    public void roundTripExtremes() {
+        TxnID a = new TxnID(Long.MIN_VALUE, Long.MAX_VALUE);
+        TxnID b = new TxnID(Long.MAX_VALUE, Long.MIN_VALUE);
+        assertThat(TxnIds.fromKey(TxnIds.toKey(a))).isEqualTo(a);
+        assertThat(TxnIds.fromKey(TxnIds.toKey(b))).isEqualTo(b);
+    }
+
+    @Test
+    public void fromKeyRejectsMissingSeparator() {
+        assertThatThrownBy(() -> TxnIds.fromKey("nope"))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    public void fromKeyRejectsTrailingSeparator() {
+        assertThatThrownBy(() -> TxnIds.fromKey("1_"))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    public void fromKeyRejectsExtraSeparators() {
+        assertThatThrownBy(() -> TxnIds.fromKey("1_2_3"))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+}

Reply via email to