This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 94de2b6806b [improve][txn] PIP-473: GC aborted-transaction records on 
ML trim and segment drop (#25975)
94de2b6806b is described below

commit 94de2b6806bbc39bddf9c32bde6755ee3a206716
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Jun 9 09:35:28 2026 -0700

    [improve][txn] PIP-473: GC aborted-transaction records on ML trim and 
segment drop (#25975)
---
 .../service/scalable/ScalableTopicService.java     |  24 +++-
 .../buffer/impl/MetadataTransactionBuffer.java     | 124 +++++++++++++++++++++
 .../transaction/metadata/TxnMetadataStore.java     |  40 +++++++
 .../service/scalable/ScalableTopicServiceTest.java |   2 +
 .../buffer/impl/MetadataTransactionBufferTest.java |  30 +++++
 .../transaction/metadata/TxnMetadataStoreTest.java |  35 ++++++
 6 files changed, 253 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
index 56b049181ca..a6c6befd6f3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
 import org.apache.pulsar.broker.resources.ScalableTopicResources;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
 import org.apache.pulsar.common.api.proto.ScalableConsumerType;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
@@ -232,6 +233,13 @@ public class ScalableTopicService {
      * Delete a scalable topic and all its segment topics.
      */
     public CompletableFuture<Void> deleteScalableTopic(TopicName topic) {
+        // When transactions are enabled, the segments carry durable 
/txn/segment-state records
+        // (watermark + aborted-txn records). Delete them alongside the 
segment topics so they don't
+        // outlive the data.
+        TxnMetadataStore txnStore =
+                
brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
+                        ? new 
TxnMetadataStore(brokerService.getPulsar().getLocalMetadataStore())
+                        : null;
         return releaseController(topic)
                 .thenCompose(__ -> 
resources.getScalableTopicMetadataAsync(topic))
                 .thenCompose(optMd -> {
@@ -239,16 +247,28 @@ public class ScalableTopicService {
                         return CompletableFuture.completedFuture(null);
                     }
                     ScalableTopicMetadata metadata = optMd.get();
-                    // Delete all underlying segment topics
+                    // Delete all underlying segment topics, then their 
durable transaction state.
                     return FutureUtil.waitForAll(
                             metadata.getSegments().values().stream()
-                                    .map(segment -> 
deleteUnderlyingSegmentTopic(topic, segment))
+                                    .map(segment -> 
deleteUnderlyingSegmentTopic(topic, segment)
+                                            .thenCompose(__ -> 
cleanupSegmentTxnState(txnStore, topic, segment)))
                                     .toList()
                     );
                 })
                 .thenCompose(__ -> resources.deleteScalableTopicAsync(topic));
     }
 
+    /** Delete the durable {@code /txn/segment-state} records for a segment 
being dropped. */
+    private CompletableFuture<Void> cleanupSegmentTxnState(TxnMetadataStore 
txnStore,
+                                                          TopicName 
parentTopic, SegmentInfo segment) {
+        if (txnStore == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+        String segmentName = SegmentTopicName.fromParent(
+                parentTopic, segment.hashRange(), 
segment.segmentId()).toString();
+        return txnStore.deleteAllSegmentState(segmentName);
+    }
+
     /**
      * Register a scalable consumer with the controller leader for {@code 
topic}.
      * Persists a durable session and returns the consumer's segment 
assignment.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
index 099bea5f211..afbdd1b8b3a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.transaction.buffer.impl;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -29,6 +30,10 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.LongAdder;
 import lombok.CustomLog;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -42,6 +47,7 @@ import 
org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
 import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
+import org.apache.pulsar.broker.transaction.metadata.AbortedTxnRecord;
 import org.apache.pulsar.broker.transaction.metadata.SegmentWatermark;
 import org.apache.pulsar.broker.transaction.metadata.TxnHeader;
 import org.apache.pulsar.broker.transaction.metadata.TxnIds;
@@ -55,6 +61,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.policies.data.TransactionBufferStats;
 import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.Runnables;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.ScanConsumer;
@@ -132,6 +139,11 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
     private final LongAdder committedCount = new LongAdder();
     private final LongAdder abortedCount = new LongAdder();
 
+    /** Periodic task that range-deletes aborted-txn records once the segment 
ML trims past them. */
+    private final ScheduledFuture<?> abortedGcTask;
+    /** Guards against a new GC cycle starting while the previous async one is 
still in flight. */
+    private final AtomicBoolean gcRunning = new AtomicBoolean(false);
+
     public MetadataTransactionBuffer(PersistentTopic topic, TxnMetadataStore 
txnStore) {
         this.topic = topic;
         this.ledger = topic.getManagedLedger();
@@ -140,6 +152,55 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
         this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack();
         this.maxReadPosition = ledger.getLastConfirmedEntry();
         recover();
+        this.abortedGcTask = scheduleAbortedGc();
+    }
+
+    /**
+     * Schedule the periodic aborted-record GC on the broker executor. Returns 
{@code null} when no
+     * executor is reachable (e.g. a unit test with a mocked topic); such 
callers drive
+     * {@link #pruneTrimmedAbortedTxns()} directly.
+     */
+    private ScheduledFuture<?> scheduleAbortedGc() {
+        ScheduledExecutorService executor = brokerExecutor();
+        if (executor == null) {
+            return null;
+        }
+        long intervalSeconds = Math.max(1, 
topic.getBrokerService().getPulsar().getConfiguration()
+                .getTransactionCoordinatorScalableTopicsGcIntervalSeconds());
+        long intervalMs = TimeUnit.SECONDS.toMillis(intervalSeconds);
+        // Wrap in catchingAndLoggingThrowables so an unexpected 
RuntimeException doesn't cancel the
+        // fixed-delay schedule. The gcRunning guard skips a cycle while the 
previous async sweep is
+        // still in flight (slow metadata store) rather than overlapping 
sweeps.
+        return 
executor.scheduleWithFixedDelay(Runnables.catchingAndLoggingThrowables(() -> {
+            if (closed || !gcRunning.compareAndSet(false, true)) {
+                return;
+            }
+            CompletableFuture<Void> sweep;
+            try {
+                sweep = pruneTrimmedAbortedTxns();
+            } catch (Throwable t) {
+                gcRunning.set(false);
+                throw t;
+            }
+            sweep.whenComplete((__, ex) -> {
+                gcRunning.set(false);
+                if (ex != null) {
+                    log.warn().attr("segment", segmentName).exception(ex)
+                            .log("Aborted-txn GC sweep failed; will retry next 
cycle");
+                }
+            });
+        }), intervalMs, intervalMs, TimeUnit.MILLISECONDS);
+    }
+
+    private ScheduledExecutorService brokerExecutor() {
+        try {
+            if (topic.getBrokerService() != null && 
topic.getBrokerService().getPulsar() != null) {
+                return topic.getBrokerService().getPulsar().getExecutor();
+            }
+        } catch (Throwable t) {
+            // Mocked topic in unit tests — no broker executor; GC is driven 
directly.
+        }
+        return null;
     }
 
     // ---- Recovery ----------------------------------------------------------
@@ -625,10 +686,73 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
     @Override
     public CompletableFuture<Void> closeAsync() {
         closed = true;
+        if (abortedGcTask != null) {
+            abortedGcTask.cancel(false);
+        }
         closeSubscriptionQuietly();
         return CompletableFuture.completedFuture(null);
     }
 
+    /**
+     * Range-delete aborted-txn records — and drop their in-memory {@link 
#abortedTxns} entries —
+     * whose highest position in this segment is below the ML's first 
still-valid position, i.e. whose
+     * data has been fully trimmed. Safe because a trimmed position is never 
dispatched, so its abort
+     * filtering is no longer needed; records for still-readable data (max 
position at or above the
+     * first valid position) are retained. Without this the durable aborted 
set and the heap set grow
+     * for the segment's whole lifetime even as the underlying data is trimmed 
away.
+     */
+    @VisibleForTesting
+    CompletableFuture<Void> pruneTrimmedAbortedTxns() {
+        if (closed) {
+            return CompletableFuture.completedFuture(null);
+        }
+        Position firstValid = ledger.getFirstPosition();
+        if (firstValid == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+        List<String> toPrune = Collections.synchronizedList(new ArrayList<>());
+        return txnStore.scanAbortedTxns(segmentName,
+                TxnPaths.abortedByPositionSegmentLowerBound(segmentName),
+                TxnPaths.abortedByPositionSegmentUpperBound(segmentName),
+                new ScanConsumer() {
+                    @Override
+                    public void onNext(GetResult r) {
+                        String txnIdKey = 
TxnPaths.txnIdFromAbortedPath(r.getStat().getPath());
+                        if (txnIdKey == null) {
+                            return;
+                        }
+                        AbortedTxnRecord rec = 
TxnMetadataStore.fromJson(r.getValue(), AbortedTxnRecord.class);
+                        Position maxPos = 
PositionFactory.create(rec.maxLedgerId(), rec.maxEntryId());
+                        // Strictly below the first valid position → fully 
trimmed (conservative).
+                        if (maxPos.compareTo(firstValid) < 0) {
+                            toPrune.add(txnIdKey);
+                        }
+                    }
+
+                    @Override
+                    public void onError(Throwable throwable) {
+                        log.warn().attr("segment", 
segmentName).exception(throwable)
+                                .log("Aborted-txn GC scan errored");
+                    }
+
+                    @Override
+                    public void onCompleted() {
+                    }
+                }).thenCompose(__ -> {
+                    if (toPrune.isEmpty()) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    synchronized (lock) {
+                        toPrune.forEach(abortedTxns::remove);
+                    }
+                    List<CompletableFuture<Void>> deletes = new 
ArrayList<>(toPrune.size());
+                    for (String txnIdKey : toPrune) {
+                        deletes.add(txnStore.deleteAbortedTxn(segmentName, 
txnIdKey));
+                    }
+                    return FutureUtil.waitForAll(deletes);
+                });
+    }
+
     private void closeSubscriptionQuietly() {
         AutoCloseable handle = subscription;
         if (handle == null) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
index ebf300b7725..ec7c25b389b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.broker.transaction.metadata;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -29,6 +31,7 @@ import java.util.function.Consumer;
 import lombok.CustomLog;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Option;
@@ -398,6 +401,43 @@ public class TxnMetadataStore {
         return store.deleteIfExists(TxnPaths.segmentWatermarkPath(segment), 
Optional.empty(), opts);
     }
 
+    /**
+     * Delete all durable per-segment transaction state — every aborted-txn 
record and the watermark —
+     * when a segment is dropped (e.g. the scalable topic is deleted), so the 
{@code /txn/segment-state}
+     * records don't outlive the segment's data. Idempotent: missing records 
are no-ops.
+     */
+    public CompletableFuture<Void> deleteAllSegmentState(String segment) {
+        List<String> abortedKeys = Collections.synchronizedList(new 
ArrayList<>());
+        return scanAbortedTxns(segment,
+                TxnPaths.abortedByPositionSegmentLowerBound(segment),
+                TxnPaths.abortedByPositionSegmentUpperBound(segment),
+                new ScanConsumer() {
+                    @Override
+                    public void onNext(GetResult r) {
+                        String txnIdKey = 
TxnPaths.txnIdFromAbortedPath(r.getStat().getPath());
+                        if (txnIdKey != null) {
+                            abortedKeys.add(txnIdKey);
+                        }
+                    }
+
+                    @Override
+                    public void onError(Throwable throwable) {
+                        log.warn().attr("segment", 
segment).exception(throwable)
+                                .log("Segment-state cleanup scan errored");
+                    }
+
+                    @Override
+                    public void onCompleted() {
+                    }
+                }).thenCompose(__ -> {
+                    List<CompletableFuture<Void>> deletes = new 
ArrayList<>(abortedKeys.size());
+                    for (String txnIdKey : abortedKeys) {
+                        deletes.add(deleteAbortedTxn(segment, txnIdKey));
+                    }
+                    return FutureUtil.waitForAll(deletes);
+                }).thenCompose(__ -> deleteSegmentWatermark(segment));
+    }
+
     // ---- TC sequence counter ----------------------------------------------
 
     /**
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java
index 725016f6e67..47271c2dc02 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
 import org.apache.pulsar.broker.resources.ScalableTopicResources;
 import org.apache.pulsar.broker.resources.SubscriptionType;
@@ -94,6 +95,7 @@ public class ScalableTopicServiceTest {
         scalableTopicsAdmin = mock(ScalableTopics.class);
 
         when(brokerService.getPulsar()).thenReturn(pulsar);
+        when(pulsar.getConfiguration()).thenReturn(new ServiceConfiguration());
         when(brokerService.getTopicIfExists(anyString()))
                 
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));
         when(pulsar.getBrokerId()).thenReturn(BROKER_ID);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
index ec3476916a9..a7f3890111b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
@@ -279,6 +279,36 @@ public class MetadataTransactionBufferTest {
         assertThat(tb.isTxnAborted(oldAbortedTxn, PositionFactory.create(3, 
5))).isTrue();
     }
 
+    @Test
+    public void pruneTrimmedAborted_dropsBelowFirstValid_retainsAbove() throws 
Exception {
+        // An aborted txn whose data the ML has fully trimmed (max position 
below the first valid
+        // position) is dropped from both the durable aborted records and the 
in-memory set; an
+        // aborted txn whose data is still readable is retained.
+        TxnID trimmedTxn = new TxnID(1, 100);   // max position on ledger 1 — 
will be trimmed away
+        TxnID liveTxn = new TxnID(1, 200);      // max position on ledger 10 — 
still readable
+        txnStore.putAbortedTxn(SEGMENT, TxnIds.toKey(trimmedTxn), 1L, 
5L).get();
+        txnStore.putAbortedTxn(SEGMENT, TxnIds.toKey(liveTxn), 10L, 5L).get();
+
+        MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic, 
txnStore);
+        tb.checkIfTBRecoverCompletely().get();
+        assertThat(tb.isTxnAborted(trimmedTxn, PositionFactory.create(1, 
5))).isTrue();
+        assertThat(tb.isTxnAborted(liveTxn, PositionFactory.create(10, 
5))).isTrue();
+
+        // The ML has trimmed everything below ledger 5.
+        when(ledger.getFirstPosition()).thenReturn(PositionFactory.create(5, 
0));
+        tb.pruneTrimmedAbortedTxns().get();
+
+        // In-memory: trimmed dropped, live retained.
+        assertThat(tb.isTxnAborted(trimmedTxn, PositionFactory.create(1, 
5))).isFalse();
+        assertThat(tb.isTxnAborted(liveTxn, PositionFactory.create(10, 
5))).isTrue();
+
+        // Durable record also deleted: a fresh TB recovers only the live txn.
+        MetadataTransactionBuffer tb2 = new MetadataTransactionBuffer(topic, 
txnStore);
+        tb2.checkIfTBRecoverCompletely().get();
+        assertThat(tb2.isTxnAborted(trimmedTxn, PositionFactory.create(1, 
5))).isFalse();
+        assertThat(tb2.isTxnAborted(liveTxn, PositionFactory.create(10, 
5))).isTrue();
+    }
+
     @Test
     public void recoveryDiscoveredOpenTxn_pinsAtWatermark() throws Exception {
         // /txn/op records exist for an open txn (broker was processing 
publishes for txn T;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
index 40c04ec0a8f..19d22f3f46e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
@@ -24,6 +24,7 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import lombok.Cleanup;
 import org.apache.pulsar.metadata.api.GetResult;
@@ -219,6 +220,40 @@ public class TxnMetadataStoreTest {
                 
assertThat(received).isNotEmpty().last().asString().isEqualTo(s.getPath()));
     }
 
+    @Test
+    public void deleteAllSegmentState_removesAbortedRecordsAndWatermark() 
throws Exception {
+        @Cleanup MetadataStore store = newMemoryStore();
+        TxnMetadataStore txn = new TxnMetadataStore(store);
+        String segment = "segment://public/default/topic/0000-ffff-0";
+
+        txn.putAbortedTxn(segment, "t1", 1L, 5L).get();
+        txn.putAbortedTxn(segment, "t2", 2L, 7L).get();
+        txn.casSegmentWatermark(segment, new SegmentWatermark(3, 0), 
Optional.empty()).get();
+
+        txn.deleteAllSegmentState(segment).get();
+
+        List<String> remaining = new ArrayList<>();
+        txn.scanAbortedTxns(segment,
+                TxnPaths.abortedByPositionSegmentLowerBound(segment),
+                TxnPaths.abortedByPositionSegmentUpperBound(segment),
+                new ScanConsumer() {
+                    @Override
+                    public void onNext(GetResult r) {
+                        remaining.add(r.getStat().getPath());
+                    }
+
+                    @Override
+                    public void onError(Throwable throwable) {
+                    }
+
+                    @Override
+                    public void onCompleted() {
+                    }
+                }).get();
+        assertThat(remaining).isEmpty();
+        assertThat(txn.getSegmentWatermark(segment).get()).isEmpty();
+    }
+
     // ---- helpers -----------------------------------------------------------
 
     private static ScanConsumer collectHeaders(List<TxnHeader> out) {

Reply via email to