This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 301d6559563 [fix][txn] PIP-473: prune terminal transactions from the 
metadata buffer cache (#25960)
301d6559563 is described below

commit 301d65595633ab625bea788cb16bf2bb51ec499c
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Jun 8 09:13:41 2026 -0700

    [fix][txn] PIP-473: prune terminal transactions from the metadata buffer 
cache (#25960)
---
 .../buffer/impl/MetadataTransactionBuffer.java     | 15 ++++++++
 .../buffer/impl/MetadataTransactionBufferTest.java | 42 ++++++++++++++++++++++
 2 files changed, 57 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
index 4ac6b89ae51..099bea5f211 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.transaction.buffer.impl;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -434,6 +435,12 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
                 // it (e.g. after an in-memory rebuild that lost the set).
                 abortedTxns.add(txnIdKey);
             }
+            // Drop the now-terminal entry so the cache stays bounded by the 
open-txn count rather
+            // than growing for the segment's lifetime. This is safe: 
recomputeMaxReadPositionLocked
+            // only consults OPEN entries, and isTxnAborted reads the separate 
abortedTxns set (an
+            // aborted txn stays there, a committed/unknown one correctly 
reads as visible). The
+            // positions needed for the durable side-effects below were 
already captured above.
+            txns.remove(txnIdKey);
         }
 
         // Persist aborted record if this is an abort.
@@ -704,6 +711,14 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
         }
     }
 
+    /** Size of the in-memory per-txn cache; bounded by the open-txn count 
once terminals are pruned. */
+    @VisibleForTesting
+    int trackedTxnCount() {
+        synchronized (lock) {
+            return txns.size();
+        }
+    }
+
     @Override
     public long getAbortedTxnCount() {
         return abortedCount.sum();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
index 8234eea3832..ec3476916a9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
@@ -157,6 +157,48 @@ public class MetadataTransactionBufferTest {
         });
     }
 
+    @Test
+    public void terminalTxns_prunedFromCache_visibilityUnchanged() throws 
Exception {
+        // Resolve many transactions (mixed commit/abort) and confirm the 
in-memory per-txn cache is
+        // pruned back to empty rather than growing for the segment's 
lifetime, while visibility stays
+        // correct: aborted txns remain filtered (via the durable aborted set) 
and committed/unknown
+        // txns remain visible.
+        MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic, 
txnStore);
+        tb.checkIfTBRecoverCompletely().get();
+
+        int n = 20;
+        TxnID lastAborted = null;
+        Position lastAbortedPos = null;
+        TxnID lastCommitted = null;
+        for (int i = 1; i <= n; i++) {
+            TxnID txnId = new TxnID(1, i);
+            createOpenHeader(txnId);
+            Position p = tb.appendBufferToTxn(txnId, 0, payload("v" + 
i)).get();
+            if (i % 2 == 0) {
+                commitTxn(txnId);
+                txnStore.publishSegmentEvent(SEGMENT, new 
TxnEvent(TxnIds.toKey(txnId), TxnState.COMMITTED)).get();
+                lastCommitted = txnId;
+            } else {
+                abortTxn(txnId);
+                txnStore.publishSegmentEvent(SEGMENT, new 
TxnEvent(TxnIds.toKey(txnId), TxnState.ABORTED)).get();
+                lastAborted = txnId;
+                lastAbortedPos = p;
+            }
+        }
+
+        // Once every txn is terminal, the cache holds nothing (no OPEN txns 
remain).
+        Awaitility.await().untilAsserted(() -> {
+            assertThat(tb.getOngoingTxnCount()).isZero();
+            assertThat(tb.trackedTxnCount()).isZero();
+        });
+
+        // Visibility correctness survives pruning.
+        assertThat(tb.isTxnAborted(lastAborted, lastAbortedPos)).isTrue();
+        assertThat(tb.isTxnAborted(lastCommitted, PositionFactory.create(1, 
0))).isFalse();
+        assertThat(tb.getCommittedTxnCount()).isEqualTo(n / 2);
+        assertThat(tb.getAbortedTxnCount()).isEqualTo(n / 2);
+    }
+
     @Test
     public void appendToCommittedTxn_failsTxnConflict() throws Exception {
         TxnID txnId = new TxnID(1, 1);

Reply via email to