This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 666d4bb3444 [feat][txn] Implement the AbortedTxnProcessor for 
TransactionBuffer (#17847)
666d4bb3444 is described below

commit 666d4bb34441942d20ddac1e9a1c094284067778
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Nov 1 01:33:57 2022 +0800

    [feat][txn] Implement the AbortedTxnProcessor for TransactionBuffer (#17847)
    
    Master Issue: https://github.com/apache/pulsar/issues/16913
    
    ### Motivation
    
    Implement an abortedTxnProcessor to handle the storage of the aborted 
transaction ID.
    ### Modifications
    The structure overview:
    
![image](https://user-images.githubusercontent.com/55571188/197683651-6ccb106d-1e71-4841-9da7-2644275a401a.png)
    
    The main idea is to move the logic of the operation of checking and 
persistent aborted transaction IDs(take snapshots)  and the operation of 
updating maxReadPosition into the AbortedTxnProcessor.
    And the AbortedTxnProcessor can be implemented in different designs.
    
    **Add `persistentWorker` to handle snapshot persistenting** :
    <img width="1003" alt="image" 
src="https://user-images.githubusercontent.com/55571188/198528131-3cde19bc-2034-4693-a8b1-4d6345e6db36.png";>
    The first four items below are the corresponding four tasks in the figure. 
The fifth item is not strictly a task, but a part of the first two tasks.
    * takeSnapshotSegmentAsync -> writeSnapshotSegmentAsync
        * These two method is used to persist the snapshot segment.
    * deleteSnapshotSegment
        * This method is used to delete the snapshot segment.
    
    * updateIndexMetadataForTheLastSnapshot
        * Using to update index metadata (the latest snapshot).
    
    * clearSnapshotSegmentAndIndexes
        * Delete all segments and then delete the index of this topic.
    
    
    * updateSnapshotIndex
        * Called by the deleteSnapshotSegment and writeSnapshotSegmentAsync. Do 
update the index after writing snapshot segment.
        * Called by recovery as a compensation mechanism for updating the index.
    
    ### Documentation
    
    <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
    
    - [x] `doc-not-needed`
    (Please explain why)
    
    
    ### Matching PR in the forked repository
    
    PR in forked repository: https://github.com/liangyepianzhou/pulsar/pull/7
---
 .../broker/service/AbstractBaseDispatcher.java     |   3 +-
 .../broker/service/persistent/PersistentTopic.java |   4 +-
 .../transaction/buffer/AbortedTxnProcessor.java    |  77 ++++++
 .../transaction/buffer/TransactionBuffer.java      |   3 +-
 .../buffer/impl/InMemTransactionBuffer.java        |   2 +-
 .../SingleSnapshotAbortedTxnProcessorImpl.java     | 182 +++++++++++++++
 .../buffer/impl/TopicTransactionBuffer.java        | 259 ++++++---------------
 .../TopicTransactionBufferRecoverCallBack.java     |   8 -
 .../buffer/impl/TransactionBufferDisable.java      |   2 +-
 .../v2/TransactionBufferSnapshotIndexes.java       |   2 +-
 ... TransactionBufferSnapshotIndexesMetadata.java} |  21 +-
 .../broker/service/AbstractBaseDispatcherTest.java |   2 +-
 .../TopicTransactionBufferRecoverTest.java         |  73 +++---
 .../pulsar/broker/transaction/TransactionTest.java |  21 +-
 14 files changed, 413 insertions(+), 246 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index b19360d5e10..f9e8e61d400 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -163,7 +163,8 @@ public abstract class AbstractBaseDispatcher extends 
EntryFilterSupport implemen
                     entry.release();
                     continue;
                 } else if (((PersistentTopic) subscription.getTopic())
-                        .isTxnAborted(new 
TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()))) {
+                        .isTxnAborted(new 
TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()),
+                                (PositionImpl) entry.getPosition())) {
                     individualAcknowledgeMessageIfNeeded(entry.getPosition(), 
Collections.emptyMap());
                     entries.set(i, null);
                     entry.release();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 94cc0964c37..965c1e164a3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3337,8 +3337,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return this.transactionBuffer.getMaxReadPosition();
     }
 
-    public boolean isTxnAborted(TxnID txnID) {
-        return this.transactionBuffer.isTxnAborted(txnID);
+    public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {
+        return this.transactionBuffer.isTxnAborted(txnID, readPosition);
     }
 
     public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java
new file mode 100644
index 00000000000..e436e1df249
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+
+public interface AbortedTxnProcessor {
+
+    /**
+     * After the transaction buffer writes a transaction aborted marker to the 
topic,
+     * the transaction buffer will put the aborted txnID and the aborted 
marker position to AbortedTxnProcessor.
+     * @param txnID aborted transaction ID.
+     * @param position the position of the abort txnID
+     */
+    void putAbortedTxnAndPosition(TxnID txnID, PositionImpl position);
+
+    /**
+     * Clean up invalid aborted transactions.
+     */
+    void trimExpiredAbortedTxns();
+
+    /**
+     * Check whether the transaction ID is an aborted transaction ID.
+     * @param txnID the transaction ID that needs to be checked.
+     * @param readPosition the read position of the transaction message, can 
be used to find the segment.
+     * @return a boolean, whether the transaction ID is an aborted transaction 
ID.
+     */
+    boolean checkAbortedTransaction(TxnID txnID, Position readPosition);
+
+    /**
+     * Recover transaction buffer by transaction buffer snapshot.
+     * @return a Position (startReadCursorPosition) determiner where to start 
to recover in the original topic.
+     */
+
+    CompletableFuture<PositionImpl> recoverFromSnapshot();
+
+    /**
+     * Delete the transaction buffer aborted transaction snapshot.
+     * @return a completableFuture.
+     */
+    CompletableFuture<Void> deleteAbortedTxnSnapshot();
+
+    /**
+     * Take aborted transactions snapshot.
+     * @return a completableFuture.
+     */
+    CompletableFuture<Void> takeAbortedTxnsSnapshot(PositionImpl 
maxReadPosition);
+
+    /**
+     * Get the lastSnapshotTimestamps.
+     * @return the lastSnapshotTimestamps.
+     */
+    long getLastSnapshotTimestamps();
+
+    CompletableFuture<Void> closeAsync();
+
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
index a47af54e32a..99093e42fd7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
@@ -141,9 +141,10 @@ public interface TransactionBuffer {
     /**
      * Close the buffer asynchronously.
      * @param txnID {@link TxnID} txnId.
+     * @param readPosition the persitent position of the txn message.
      * @return the txnId is aborted.
      */
-    boolean isTxnAborted(TxnID txnID);
+    boolean isTxnAborted(TxnID txnID, PositionImpl readPosition);
 
     /**
      * Sync max read position for normal publish.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index d843542c8cb..56b49f98efe 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -360,7 +360,7 @@ class InMemTransactionBuffer implements TransactionBuffer {
     }
 
     @Override
-    public boolean isTxnAborted(TxnID txnID) {
+    public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {
         return false;
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
new file mode 100644
index 00000000000..a13dd0499a6
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class SingleSnapshotAbortedTxnProcessorImpl implements 
AbortedTxnProcessor {
+    private final PersistentTopic topic;
+    private final 
CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>> 
takeSnapshotWriter;
+    /**
+     * Aborts, map for jude message is aborted, linked for remove abort txn in 
memory when this
+     * position have been deleted.
+     */
+    private final LinkedMap<TxnID, PositionImpl> aborts = new LinkedMap<>();
+
+    private volatile long lastSnapshotTimestamps;
+
+    public SingleSnapshotAbortedTxnProcessorImpl(PersistentTopic topic) {
+        this.topic = topic;
+        this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar()
+                .getTransactionBufferSnapshotServiceFactory()
+                
.getTxnBufferSnapshotService().createWriter(TopicName.get(topic.getName()));
+    }
+
+    @Override
+    public void putAbortedTxnAndPosition(TxnID abortedTxnId, PositionImpl 
position) {
+        aborts.put(abortedTxnId, position);
+    }
+
+    //In this implementation we clear the invalid aborted txn ID one by one.
+    @Override
+    public void trimExpiredAbortedTxns() {
+        while (!aborts.isEmpty() && !((ManagedLedgerImpl) 
topic.getManagedLedger())
+                .ledgerExists(aborts.get(aborts.firstKey()).getLedgerId())) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Topic transaction buffer clear aborted 
transaction, TxnId : {}, Position : {}",
+                        topic.getName(), aborts.firstKey(), 
aborts.get(aborts.firstKey()));
+            }
+            aborts.remove(aborts.firstKey());
+        }
+    }
+
+    @Override
+    public boolean checkAbortedTransaction(TxnID txnID, Position readPosition) 
{
+        return aborts.containsKey(txnID);
+    }
+
+
+    @Override
+    public CompletableFuture<PositionImpl> recoverFromSnapshot() {
+        return 
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
+                .getTxnBufferSnapshotService()
+                
.createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
+                    PositionImpl startReadCursorPosition = null;
+                    try {
+                        while (reader.hasMoreEvents()) {
+                            Message<TransactionBufferSnapshot> message = 
reader.readNext();
+                            if (topic.getName().equals(message.getKey())) {
+                                TransactionBufferSnapshot 
transactionBufferSnapshot = message.getValue();
+                                if (transactionBufferSnapshot != null) {
+                                    handleSnapshot(transactionBufferSnapshot);
+                                    startReadCursorPosition = PositionImpl.get(
+                                            
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
+                                            
transactionBufferSnapshot.getMaxReadPositionEntryId());
+                                }
+                            }
+                        }
+                        closeReader(reader);
+                        return 
CompletableFuture.completedFuture(startReadCursorPosition);
+                    } catch (Exception ex) {
+                        log.error("[{}] Transaction buffer recover fail when 
read "
+                                + "transactionBufferSnapshot!", 
topic.getName(), ex);
+                        closeReader(reader);
+                        return FutureUtil.failedFuture(ex);
+                    }
+
+                },  
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
+                        .getExecutor(this));
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteAbortedTxnSnapshot() {
+        return this.takeSnapshotWriter.thenCompose(writer -> {
+            TransactionBufferSnapshot snapshot = new 
TransactionBufferSnapshot();
+            snapshot.setTopicName(topic.getName());
+            return writer.deleteAsync(snapshot.getTopicName(), snapshot);
+        }).thenRun(() -> {
+            log.info("[{}] Successes to delete the aborted transaction 
snapshot", this.topic);
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> takeAbortedTxnsSnapshot(PositionImpl 
maxReadPosition) {
+        return takeSnapshotWriter.thenCompose(writer -> {
+            TransactionBufferSnapshot snapshot = new 
TransactionBufferSnapshot();
+            snapshot.setTopicName(topic.getName());
+            snapshot.setMaxReadPositionLedgerId(maxReadPosition.getLedgerId());
+            snapshot.setMaxReadPositionEntryId(maxReadPosition.getEntryId());
+            List<AbortTxnMetadata> list = new ArrayList<>();
+            aborts.forEach((k, v) -> {
+                AbortTxnMetadata abortTxnMetadata = new AbortTxnMetadata();
+                abortTxnMetadata.setTxnIdMostBits(k.getMostSigBits());
+                abortTxnMetadata.setTxnIdLeastBits(k.getLeastSigBits());
+                abortTxnMetadata.setLedgerId(v.getLedgerId());
+                abortTxnMetadata.setEntryId(v.getEntryId());
+                list.add(abortTxnMetadata);
+            });
+            snapshot.setAborts(list);
+            return writer.writeAsync(snapshot.getTopicName(), 
snapshot).thenAccept(messageId -> {
+                this.lastSnapshotTimestamps = System.currentTimeMillis();
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}]Transaction buffer take snapshot success! "
+                            + "messageId : {}", topic.getName(), messageId);
+                }
+            }).exceptionally(e -> {
+                log.warn("[{}]Transaction buffer take snapshot fail! ", 
topic.getName(), e.getCause());
+                return null;
+            });
+        });
+    }
+
+    @Override
+    public long getLastSnapshotTimestamps() {
+        return this.lastSnapshotTimestamps;
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return 
takeSnapshotWriter.thenCompose(SystemTopicClient.Writer::closeAsync);
+    }
+
+    private void 
closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> reader) {
+        reader.closeAsync().exceptionally(e -> {
+            log.error("[{}]Transaction buffer reader close error!", 
topic.getName(), e);
+            return null;
+        });
+    }
+
+    private void handleSnapshot(TransactionBufferSnapshot snapshot) {
+        if (snapshot.getAborts() != null) {
+            snapshot.getAborts().forEach(abortTxnMetadata ->
+                    aborts.put(new TxnID(abortTxnMetadata.getTxnIdMostBits(),
+                                    abortTxnMetadata.getTxnIdLeastBits()),
+                            PositionImpl.get(abortTxnMetadata.getLedgerId(),
+                                    abortTxnMetadata.getEntryId())));
+        }
+    }
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index dfebbba7c66..f3bf4f95923 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.util.Timeout;
 import io.netty.util.Timer;
 import io.netty.util.TimerTask;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -38,23 +37,20 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
 import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
-import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
-import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TransactionBufferStats;
 import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
 import org.apache.pulsar.common.protocol.Commands;
@@ -79,14 +75,6 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
      */
     private final LinkedMap<TxnID, PositionImpl> ongoingTxns = new 
LinkedMap<>();
 
-    /**
-     * Aborts, map for jude message is aborted, linked for remove abort txn in 
memory when this
-     * position have been deleted.
-     */
-    private final LinkedMap<TxnID, PositionImpl> aborts = new LinkedMap<>();
-
-    private final 
CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>> 
takeSnapshotWriter;
-
     // when add abort or change max read position, the count will +1. Take 
snapshot will set 0 into it.
     private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new 
AtomicLong();
 
@@ -100,8 +88,6 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
 
     private final int takeSnapshotIntervalTime;
 
-    private volatile long lastSnapshotTimestamps;
-
     private final CompletableFuture<Void> transactionBufferFuture = new 
CompletableFuture<>();
 
     /**
@@ -113,18 +99,18 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
 
     private final Semaphore handleLowWaterMark = new Semaphore(1);
 
+    private final AbortedTxnProcessor snapshotAbortedTxnProcessor;
+
     public TopicTransactionBuffer(PersistentTopic topic) {
         super(State.None);
         this.topic = topic;
-        this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar()
-                .getTransactionBufferSnapshotServiceFactory()
-                
.getTxnBufferSnapshotService().createWriter(TopicName.get(topic.getName()));
         this.timer = 
topic.getBrokerService().getPulsar().getTransactionTimer();
         this.takeSnapshotIntervalNumber = topic.getBrokerService().getPulsar()
                 
.getConfiguration().getTransactionBufferSnapshotMaxTransactionCount();
         this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
                 
.getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
         this.maxReadPosition = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
+        this.snapshotAbortedTxnProcessor = new 
SingleSnapshotAbortedTxnProcessorImpl(topic);
         this.recover();
     }
 
@@ -135,9 +121,6 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                     @Override
                     public void recoverComplete() {
                         synchronized (TopicTransactionBuffer.this) {
-                            // sync maxReadPosition change to LAC when 
TopicTransaction buffer have not recover
-                            // completely the normal message have been sent to 
broker and state is
-                            // not Ready can't sync maxReadPosition when no 
ongoing transactions
                             if (ongoingTxns.isEmpty()) {
                                 maxReadPosition = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
                             }
@@ -160,9 +143,6 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                     @Override
                     public void noNeedToRecover() {
                         synchronized (TopicTransactionBuffer.this) {
-                            // sync maxReadPosition change to LAC when 
TopicTransaction buffer have not recover
-                            // completely the normal message have been sent to 
broker and state is
-                            // not NoSnapshot can't sync maxReadPosition
                             maxReadPosition = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
                             if (!changeToNoSnapshotState()) {
                                 log.error("[{}]Transaction buffer recover 
fail", topic.getName());
@@ -172,20 +152,6 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                             }
                         }
                     }
-
-                    @Override
-                    public void handleSnapshot(TransactionBufferSnapshot 
snapshot) {
-                        maxReadPosition = 
PositionImpl.get(snapshot.getMaxReadPositionLedgerId(),
-                                snapshot.getMaxReadPositionEntryId());
-                        if (snapshot.getAborts() != null) {
-                            snapshot.getAborts().forEach(abortTxnMetadata ->
-                                    aborts.put(new 
TxnID(abortTxnMetadata.getTxnIdMostBits(),
-                                                    
abortTxnMetadata.getTxnIdLeastBits()),
-                                            
PositionImpl.get(abortTxnMetadata.getLedgerId(),
-                                                    
abortTxnMetadata.getEntryId())));
-                        }
-                    }
-
                     @Override
                     public void handleTxnEntry(Entry entry) {
                         ByteBuf metadataAndPayload = entry.getDataBuffer();
@@ -197,7 +163,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                             PositionImpl position = 
PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
                             if (Markers.isTxnMarker(msgMetadata)) {
                                 if (Markers.isTxnAbortMarker(msgMetadata)) {
-                                    aborts.put(txnID, position);
+                                    
snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, position);
                                 }
                                 updateMaxReadPosition(txnID);
                             } else {
@@ -225,7 +191,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                         
recoverTime.setRecoverEndTime(System.currentTimeMillis());
                         topic.close(true);
                     }
-                }, this.topic, this, takeSnapshotWriter));
+                }, this.topic, this, snapshotAbortedTxnProcessor));
     }
 
     @Override
@@ -241,7 +207,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
             CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
             transactionBufferFuture.thenRun(() -> {
                 if (checkIfNoSnapshot()) {
-                    takeSnapshot().thenRun(() -> {
+                    
snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(() 
-> {
                         if (changeToReadyStateFromNoSnapshot()) {
                             timer.newTimeout(TopicTransactionBuffer.this,
                                     takeSnapshotIntervalTime, 
TimeUnit.MILLISECONDS);
@@ -308,7 +274,8 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
     }
 
     private void handleTransactionMessage(TxnID txnId, Position position) {
-        if (!ongoingTxns.containsKey(txnId) && !aborts.containsKey(txnId)) {
+        if (!ongoingTxns.containsKey(txnId) && 
!this.snapshotAbortedTxnProcessor
+                .checkAbortedTransaction(txnId, position)) {
             ongoingTxns.put(txnId, (PositionImpl) position);
             PositionImpl firstPosition = 
ongoingTxns.get(ongoingTxns.firstKey());
             //max read position is less than first ongoing transaction message 
position, so entryId -1
@@ -316,7 +283,6 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
         }
     }
 
-
     @Override
     public CompletableFuture<TransactionBufferReader> 
openTransactionBufferReader(TxnID txnID, long startSequenceId) {
         return null;
@@ -339,7 +305,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                         synchronized (TopicTransactionBuffer.this) {
                             updateMaxReadPosition(txnID);
                             handleLowWaterMark(txnID, lowWaterMark);
-                            clearAbortedTransactions();
+                            
snapshotAbortedTxnProcessor.trimExpiredAbortedTxns();
                             takeSnapshotByChangeTimes();
                         }
                         txnCommittedCounter.increment();
@@ -383,10 +349,9 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                     @Override
                     public void addComplete(Position position, ByteBuf 
entryData, Object ctx) {
                         synchronized (TopicTransactionBuffer.this) {
-                            aborts.put(txnID, (PositionImpl) position);
                             updateMaxReadPosition(txnID);
-                            
changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
-                            clearAbortedTransactions();
+                            
snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, maxReadPosition);
+                            
snapshotAbortedTxnProcessor.trimExpiredAbortedTxns();
                             takeSnapshotByChangeTimes();
                         }
                         txnAbortedCounter.increment();
@@ -454,60 +419,20 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
 
     private void takeSnapshotByChangeTimes() {
         if (changeMaxReadPositionAndAddAbortTimes.get() >= 
takeSnapshotIntervalNumber) {
-            takeSnapshot();
+            this.changeMaxReadPositionAndAddAbortTimes.set(0);
+            
this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition);
         }
     }
 
     private void takeSnapshotByTimeout() {
         if (changeMaxReadPositionAndAddAbortTimes.get() > 0) {
-            takeSnapshot();
+            this.changeMaxReadPositionAndAddAbortTimes.set(0);
+            
this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(this.maxReadPosition);
         }
         this.timer.newTimeout(TopicTransactionBuffer.this,
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    private CompletableFuture<Void> takeSnapshot() {
-        changeMaxReadPositionAndAddAbortTimes.set(0);
-        return takeSnapshotWriter.thenCompose(writer -> {
-            TransactionBufferSnapshot snapshot = new 
TransactionBufferSnapshot();
-            synchronized (TopicTransactionBuffer.this) {
-                snapshot.setTopicName(topic.getName());
-                
snapshot.setMaxReadPositionLedgerId(maxReadPosition.getLedgerId());
-                
snapshot.setMaxReadPositionEntryId(maxReadPosition.getEntryId());
-                List<AbortTxnMetadata> list = new ArrayList<>();
-                aborts.forEach((k, v) -> {
-                    AbortTxnMetadata abortTxnMetadata = new AbortTxnMetadata();
-                    abortTxnMetadata.setTxnIdMostBits(k.getMostSigBits());
-                    abortTxnMetadata.setTxnIdLeastBits(k.getLeastSigBits());
-                    abortTxnMetadata.setLedgerId(v.getLedgerId());
-                    abortTxnMetadata.setEntryId(v.getEntryId());
-                    list.add(abortTxnMetadata);
-                });
-                snapshot.setAborts(list);
-            }
-            return writer.writeAsync(snapshot.getTopicName(), 
snapshot).thenAccept(messageId-> {
-                this.lastSnapshotTimestamps = System.currentTimeMillis();
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}]Transaction buffer take snapshot success! "
-                            + "messageId : {}", topic.getName(), messageId);
-                }
-            }).exceptionally(e -> {
-                log.warn("[{}]Transaction buffer take snapshot fail! ", 
topic.getName(), e);
-                return null;
-            });
-        });
-    }
-    private void clearAbortedTransactions() {
-        while (!aborts.isEmpty() && !((ManagedLedgerImpl) 
topic.getManagedLedger())
-                .ledgerExists(aborts.get(aborts.firstKey()).getLedgerId())) {
-            if (log.isDebugEnabled()) {
-                aborts.firstKey();
-                log.debug("[{}] Topic transaction buffer clear aborted 
transaction, TxnId : {}, Position : {}",
-                        topic.getName(), aborts.firstKey(), 
aborts.get(aborts.firstKey()));
-            }
-            aborts.remove(aborts.firstKey());
-        }
-    }
     void updateMaxReadPosition(TxnID txnID) {
         PositionImpl preMaxReadPosition = this.maxReadPosition;
         ongoingTxns.remove(txnID);
@@ -530,22 +455,18 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
 
     @Override
     public CompletableFuture<Void> clearSnapshot() {
-        return this.takeSnapshotWriter.thenCompose(writer -> {
-            TransactionBufferSnapshot snapshot = new 
TransactionBufferSnapshot();
-            snapshot.setTopicName(topic.getName());
-            return writer.deleteAsync(snapshot.getTopicName(), snapshot);
-        }).thenCompose(__ -> CompletableFuture.completedFuture(null));
+        return snapshotAbortedTxnProcessor.deleteAbortedTxnSnapshot();
     }
 
     @Override
     public CompletableFuture<Void> closeAsync() {
         changeToCloseState();
-        return 
this.takeSnapshotWriter.thenCompose(SystemTopicClient.Writer::closeAsync);
+        return this.snapshotAbortedTxnProcessor.closeAsync();
     }
 
     @Override
-    public boolean isTxnAborted(TxnID txnID) {
-        return aborts.containsKey(txnID);
+    public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {
+        return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID, 
readPosition);
     }
 
     @Override
@@ -554,7 +475,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
         // thread is the same tread, in this time the lastAddConfirm don't 
content transaction message.
         synchronized (TopicTransactionBuffer.this) {
             if (checkIfNoSnapshot()) {
-                maxReadPosition = position;
+                this.maxReadPosition = position;
             } else if (checkIfReady()) {
                 if (ongoingTxns.isEmpty()) {
                     maxReadPosition = position;
@@ -576,7 +497,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
     @Override
     public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
         TransactionInBufferStats transactionInBufferStats = new 
TransactionInBufferStats();
-        transactionInBufferStats.aborted = isTxnAborted(txnID);
+        transactionInBufferStats.aborted = isTxnAborted(txnID, null);
         if (ongoingTxns.containsKey(txnID)) {
             transactionInBufferStats.startPosition = 
ongoingTxns.get(txnID).toString();
         }
@@ -586,7 +507,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
     @Override
     public TransactionBufferStats getStats(boolean lowWaterMarks) {
         TransactionBufferStats transactionBufferStats = new 
TransactionBufferStats();
-        transactionBufferStats.lastSnapshotTimestamps = 
this.lastSnapshotTimestamps;
+        transactionBufferStats.lastSnapshotTimestamps = 
this.snapshotAbortedTxnProcessor.getLastSnapshotTimestamps();
         transactionBufferStats.state = this.getState().name();
         transactionBufferStats.maxReadPosition = 
this.maxReadPosition.toString();
         if (lowWaterMarks) {
@@ -602,7 +523,9 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
     @Override
     public void run(Timeout timeout) {
         if (checkIfReady()) {
-            takeSnapshotByTimeout();
+            synchronized (TopicTransactionBuffer.this) {
+                takeSnapshotByTimeout();
+            }
         }
     }
 
@@ -625,108 +548,76 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
 
         private final TopicTransactionBuffer topicTransactionBuffer;
 
-        private final 
CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>> 
takeSnapshotWriter;
+        private final AbortedTxnProcessor abortedTxnProcessor;
 
         private 
TopicTransactionBufferRecover(TopicTransactionBufferRecoverCallBack callBack, 
PersistentTopic topic,
-                                              TopicTransactionBuffer 
transactionBuffer, CompletableFuture<
-                SystemTopicClient.Writer<TransactionBufferSnapshot>> 
takeSnapshotWriter) {
+                                              TopicTransactionBuffer 
transactionBuffer,
+                                              AbortedTxnProcessor 
abortedTxnProcessor) {
             this.topic = topic;
             this.callBack = callBack;
             this.entryQueue = new SpscArrayQueue<>(2000);
             this.topicTransactionBuffer = transactionBuffer;
-            this.takeSnapshotWriter = takeSnapshotWriter;
+            this.abortedTxnProcessor = abortedTxnProcessor;
         }
 
         @SneakyThrows
         @Override
         public void run() {
-            this.takeSnapshotWriter.thenRunAsync(() -> {
-                if (!this.topicTransactionBuffer.changeToInitializingState()) {
-                    log.warn("TransactionBuffer {} of topic {} can not change 
state to Initializing",
-                            this, topic.getName());
+            if (!this.topicTransactionBuffer.changeToInitializingState()) {
+                log.warn("TransactionBuffer {} of topic {} can not change 
state to Initializing",
+                        this, topic.getName());
+                return;
+            }
+            
abortedTxnProcessor.recoverFromSnapshot().thenAcceptAsync(startReadCursorPosition
 -> {
+                //Transaction is not use for this topic, so just make 
maxReadPosition as LAC.
+                if (startReadCursorPosition == null) {
+                    callBack.noNeedToRecover();
+                    return;
+                } else {
+                    this.startReadCursorPosition = startReadCursorPosition;
+                }
+                ManagedCursor managedCursor;
+                try {
+                    managedCursor = topic.getManagedLedger()
+                            .newNonDurableCursor(this.startReadCursorPosition, 
SUBSCRIPTION_NAME);
+                } catch (ManagedLedgerException e) {
+                    callBack.recoverExceptionally(e);
+                    log.error("[{}]Transaction buffer recover fail when open 
cursor!", topic.getName(), e);
                     return;
                 }
-                
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
-                        
.getTxnBufferSnapshotService().createReader(TopicName.get(topic.getName()))
-                        .thenAcceptAsync(reader -> {
+                PositionImpl lastConfirmedEntry =
+                        (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
+                PositionImpl currentLoadPosition = (PositionImpl) 
this.startReadCursorPosition;
+                FillEntryQueueCallback fillEntryQueueCallback = new 
FillEntryQueueCallback(entryQueue,
+                        managedCursor, TopicTransactionBufferRecover.this);
+                if (lastConfirmedEntry.getEntryId() != -1) {
+                    while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0
+                            && fillEntryQueueCallback.fillQueue()) {
+                        Entry entry = entryQueue.poll();
+                        if (entry != null) {
                             try {
-                                boolean hasSnapshot = false;
-                                while (reader.hasMoreEvents()) {
-                                    Message<TransactionBufferSnapshot> message 
= reader.readNext();
-                                    if 
(topic.getName().equals(message.getKey())) {
-                                        TransactionBufferSnapshot 
transactionBufferSnapshot = message.getValue();
-                                        if (transactionBufferSnapshot != null) 
{
-                                            hasSnapshot = true;
-                                            
callBack.handleSnapshot(transactionBufferSnapshot);
-                                            this.startReadCursorPosition = 
PositionImpl.get(
-                                                    
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
-                                                    
transactionBufferSnapshot.getMaxReadPositionEntryId());
-                                        }
-                                    }
-                                }
-                                if (!hasSnapshot) {
-                                    closeReader(reader);
-                                    callBack.noNeedToRecover();
-                                    return;
-                                }
-                            } catch (Exception ex) {
-                                log.error("[{}] Transaction buffer recover 
fail when read "
-                                        + "transactionBufferSnapshot!", 
topic.getName(), ex);
-                                callBack.recoverExceptionally(ex);
-                                closeReader(reader);
-                                return;
+                                currentLoadPosition = 
PositionImpl.get(entry.getLedgerId(),
+                                        entry.getEntryId());
+                                callBack.handleTxnEntry(entry);
+                            } finally {
+                                entry.release();
                             }
-                            closeReader(reader);
-
-                            ManagedCursor managedCursor;
+                        } else {
                             try {
-                                managedCursor = topic.getManagedLedger()
-                                        
.newNonDurableCursor(this.startReadCursorPosition, SUBSCRIPTION_NAME);
-                            } catch (ManagedLedgerException e) {
-                                callBack.recoverExceptionally(e);
-                                log.error("[{}]Transaction buffer recover fail 
when open cursor!", topic.getName(), e);
-                                return;
-                            }
-                            PositionImpl lastConfirmedEntry =
-                                    (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
-                            PositionImpl currentLoadPosition = (PositionImpl) 
this.startReadCursorPosition;
-                            FillEntryQueueCallback fillEntryQueueCallback = 
new FillEntryQueueCallback(entryQueue,
-                                    managedCursor, 
TopicTransactionBufferRecover.this);
-                            if (lastConfirmedEntry.getEntryId() != -1) {
-                                while 
(lastConfirmedEntry.compareTo(currentLoadPosition) > 0
-                                        && fillEntryQueueCallback.fillQueue()) 
{
-                                    Entry entry = entryQueue.poll();
-                                    if (entry != null) {
-                                        try {
-                                            currentLoadPosition = 
PositionImpl.get(entry.getLedgerId(),
-                                                    entry.getEntryId());
-                                            callBack.handleTxnEntry(entry);
-                                        } finally {
-                                            entry.release();
-                                        }
-                                    } else {
-                                        try {
-                                            Thread.sleep(1);
-                                        } catch (InterruptedException e) {
-                                            //no-op
-                                        }
-                                    }
-                                }
+                                Thread.sleep(1);
+                            } catch (InterruptedException e) {
+                                //no-op
                             }
+                        }
+                    }
+                }
 
-                            closeCursor(SUBSCRIPTION_NAME);
-                            callBack.recoverComplete();
-                        }, 
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
-                                .getExecutor(this)).exceptionally(e -> {
-                            callBack.recoverExceptionally(e.getCause());
-                            log.error("[{}]Transaction buffer new snapshot 
reader fail!", topic.getName(), e);
-                            return null;
-                        });
+                closeCursor(SUBSCRIPTION_NAME);
+                callBack.recoverComplete();
             }, 
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
                     .getExecutor(this)).exceptionally(e -> {
                 callBack.recoverExceptionally(e.getCause());
-                log.error("[{}]Transaction buffer create snapshot writer 
fail!",
-                        topic.getName(), e);
+                log.error("[{}]Transaction buffer failed to recover 
snapshot!", topic.getName(), e);
                 return null;
             });
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
index 32ff0ebe16b..7dedb2cb6b7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.broker.transaction.buffer.impl;
 
 import org.apache.bookkeeper.mledger.Entry;
-import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
 
 public interface TopicTransactionBufferRecoverCallBack {
 
@@ -34,13 +33,6 @@ public interface TopicTransactionBufferRecoverCallBack {
      */
     void noNeedToRecover();
 
-    /**
-     * Handle transactionBufferSnapshot.
-     *
-     * @param snapshot the transaction buffer snapshot
-     */
-    void handleSnapshot(TransactionBufferSnapshot snapshot);
-
     /**
      * Handle transaction entry beyond the snapshot.
      *
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index c1d26b2eaec..22ba8e2d2e8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -79,7 +79,7 @@ public class TransactionBufferDisable implements 
TransactionBuffer {
     }
 
     @Override
-    public boolean isTxnAborted(TxnID txnID) {
+    public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {
         return false;
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexes.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexes.java
index 613083e5fb7..d548835ed8f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexes.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexes.java
@@ -35,6 +35,6 @@ public class TransactionBufferSnapshotIndexes {
 
     private List<TransactionBufferSnapshotIndex> indexList;
 
-    private TransactionBufferSnapshotSegment snapshot;
+    private TransactionBufferSnapshotIndexesMetadata snapshot;
 
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexes.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexesMetadata.java
similarity index 75%
copy from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexes.java
copy to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexesMetadata.java
index 613083e5fb7..9a468d250bb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexes.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexesMetadata.java
@@ -18,23 +18,16 @@
  */
 package org.apache.pulsar.broker.transaction.buffer.metadata.v2;
 
-import java.util.List;
+import java.util.Set;
 import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Getter;
+import lombok.Data;
 import lombok.NoArgsConstructor;
-import lombok.Setter;
 
+@Data
 @AllArgsConstructor
 @NoArgsConstructor
-@Getter
-@Setter
-@Builder
-public class TransactionBufferSnapshotIndexes {
-    private String topicName;
-
-    private List<TransactionBufferSnapshotIndex> indexList;
-
-    private TransactionBufferSnapshotSegment snapshot;
-
+public class TransactionBufferSnapshotIndexesMetadata {
+    private long maxReadPositionLedgerId;
+    private long maxReadPositionEntryId;
+    private Set<TxnIDData> aborts;
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
index a4f2b6ec282..554ef1c3f96 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
@@ -130,7 +130,7 @@ public class AbstractBaseDispatcherTest {
         PersistentTopic mockTopic = mock(PersistentTopic.class);
         when(this.subscriptionMock.getTopic()).thenReturn(mockTopic);
 
-        when(mockTopic.isTxnAborted(any(TxnID.class))).thenReturn(true);
+        when(mockTopic.isTxnAborted(any(TxnID.class), any())).thenReturn(true);
 
         List<Entry> entries = new ArrayList<>();
         entries.add(EntryImpl.create(1, 1, createTnxMessage("message1", 1)));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index c1e347aa8c7..39c324d92f3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -30,16 +30,12 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import java.io.IOException;
 import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
@@ -61,6 +57,8 @@ import 
org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import 
org.apache.pulsar.broker.transaction.buffer.impl.SingleSnapshotAbortedTxnProcessorImpl;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex;
@@ -128,6 +126,14 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
         };
     }
 
+    @DataProvider(name = "enableSnapshotSegment")
+    public Object[] testSnapshot() {
+        return new Boolean[] {
+                true,
+                false
+        };
+    }
+
     @Test(dataProvider = "testTopic")
     private void recoverTest(String testTopic) throws Exception {
         PulsarClient pulsarClient = this.pulsarClient;
@@ -169,7 +175,7 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
         Message<String> message = consumer.receive(2, TimeUnit.SECONDS);
         assertNull(message);
 
-        tnx1.commit();
+        tnx1.commit().get();
 
         // only can receive message 1
         message = consumer.receive(2, TimeUnit.SECONDS);
@@ -242,9 +248,7 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
 
     }
 
-    @Test
-    private void testTakeSnapshot() throws IOException, ExecutionException, 
InterruptedException {
-
+    private void testTakeSnapshot() throws Exception {
         @Cleanup
         Producer<String> producer = pulsarClient
                 .newProducer(Schema.STRING)
@@ -314,8 +318,9 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
 
     }
 
-    @Test
-    private void testTopicTransactionBufferDeleteAbort() throws Exception {
+    @Test(dataProvider = "enableSnapshotSegment")
+    private void testTopicTransactionBufferDeleteAbort(Boolean 
enableSnapshotSegment) throws Exception {
+        
getPulsarServiceList().get(0).getConfig().setTransactionBufferSegmentedSnapshotEnabled(enableSnapshotSegment);
         @Cleanup
         Producer<String> producer = pulsarClient
                 .newProducer(Schema.STRING)
@@ -390,22 +395,34 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
                     field.setAccessible(true);
                     TopicTransactionBuffer topicTransactionBuffer =
                             (TopicTransactionBuffer) 
field.get(persistentTopic);
-                    field = 
TopicTransactionBuffer.class.getDeclaredField("aborts");
+                    field = 
TopicTransactionBuffer.class.getDeclaredField("snapshotAbortedTxnProcessor");
                     field.setAccessible(true);
-                    LinkedMap<TxnID, PositionImpl> linkedMap =
-                            (LinkedMap<TxnID, PositionImpl>) 
field.get(topicTransactionBuffer);
-                    assertEquals(linkedMap.size(), 1);
-                    
assertEquals(linkedMap.get(linkedMap.firstKey()).getLedgerId(),
-                            ((MessageIdImpl) 
message.getMessageId()).getLedgerId());
-                    exist = true;
+                    AbortedTxnProcessor abortedTxnProcessor = 
(AbortedTxnProcessor) field.get(topicTransactionBuffer);
+
+                    if (enableSnapshotSegment) {
+                        //TODO
+                        exist = true;
+                    } else {
+                        Field abortsField = 
SingleSnapshotAbortedTxnProcessorImpl.class.getDeclaredField("aborts");
+                        abortsField.setAccessible(true);
+
+                        LinkedMap<TxnID, PositionImpl> linkedMap =
+                                (LinkedMap<TxnID, PositionImpl>) 
abortsField.get(abortedTxnProcessor);
+                        assertEquals(linkedMap.size(), 1);
+                        
assertEquals(linkedMap.get(linkedMap.firstKey()).getLedgerId(),
+                                ((MessageIdImpl) 
message.getMessageId()).getLedgerId());
+                        exist = true;
+                    }
+
                 }
             }
         }
         assertTrue(exist);
     }
 
-    @Test
-    public void clearTransactionBufferSnapshotTest() throws Exception {
+    @Test(dataProvider = "enableSnapshotSegment")
+    public void clearTransactionBufferSnapshotTest(Boolean 
enableSnapshotSegment) throws Exception {
+        
getPulsarServiceList().get(0).getConfig().setTransactionBufferSegmentedSnapshotEnabled(enableSnapshotSegment);
         String topic = NAMESPACE1 + "/tb-snapshot-delete-" + 
RandomUtils.nextInt();
 
         Producer<byte[]> producer = pulsarClient
@@ -426,9 +443,11 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
         PersistentTopic originalTopic = (PersistentTopic) 
getPulsarServiceList().get(0)
                 .getBrokerService().getTopic(TopicName.get(topic).toString(), 
false).get().get();
         TopicTransactionBuffer topicTransactionBuffer = 
(TopicTransactionBuffer) originalTopic.getTransactionBuffer();
-        Method takeSnapshotMethod = 
TopicTransactionBuffer.class.getDeclaredMethod("takeSnapshot");
-        takeSnapshotMethod.setAccessible(true);
-        takeSnapshotMethod.invoke(topicTransactionBuffer);
+        Field abortedTxnProcessorField = 
TopicTransactionBuffer.class.getDeclaredField("snapshotAbortedTxnProcessor");
+        abortedTxnProcessorField.setAccessible(true);
+        AbortedTxnProcessor abortedTxnProcessor =
+                (AbortedTxnProcessor) 
abortedTxnProcessorField.get(topicTransactionBuffer);
+        
abortedTxnProcessor.takeAbortedTxnsSnapshot(topicTransactionBuffer.getMaxReadPosition());
 
         TopicName transactionBufferTopicName =
                 NamespaceEventsSystemTopicFactory.getSystemTopicName(
@@ -667,8 +686,9 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
         snapshot.setSequenceId(1L);
         snapshot.setMaxReadPositionLedgerId(2L);
         snapshot.setMaxReadPositionEntryId(3L);
-        snapshot.setAborts(Collections.singletonList(
-                new TxnIDData(1, 1)));
+        LinkedList<TxnIDData> txnIDSet = new LinkedList<>();
+        txnIDSet.add(new TxnIDData(1, 1));
+        snapshot.setAborts(txnIDSet );
 
         segmentWriter.write(buildKey(snapshot), snapshot);
         snapshot.setSequenceId(2L);
@@ -711,14 +731,15 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
         ByteBuf headersAndPayload = entry.getDataBuffer();
         //skip metadata
         MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
-        snapshot = 
Schema.AVRO(TransactionBufferSnapshotSegment.class).decode(Unpooled.wrappedBuffer(headersAndPayload).nioBuffer());
+        snapshot = Schema.AVRO(TransactionBufferSnapshotSegment.class)
+                .decode(Unpooled.wrappedBuffer(headersAndPayload).nioBuffer());
 
         //verify snapshot
         assertEquals(snapshot.getTopicName(), snapshotTopic);
         assertEquals(snapshot.getSequenceId(), 2L);
         assertEquals(snapshot.getMaxReadPositionLedgerId(), 2L);
         assertEquals(snapshot.getMaxReadPositionEntryId(), 3L);
-        assertEquals(snapshot.getAborts().get(0), new TxnIDData(1, 1));
+        assertEquals(snapshot.getAborts().toArray()[0], new TxnIDData(1, 1));
     }
 
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 64dca965267..33305a2b8df 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -91,6 +91,7 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferProvider;
@@ -942,9 +943,11 @@ public class TransactionTest extends TransactionTestBase {
         filed1.set(persistentTopic, managedLedger);
 
         TopicTransactionBuffer topicTransactionBuffer = 
(TopicTransactionBuffer) field2.get(persistentTopic);
-        Method method = 
TopicTransactionBuffer.class.getDeclaredMethod("takeSnapshot");
-        method.setAccessible(true);
-        CompletableFuture<Void> completableFuture = (CompletableFuture<Void>) 
method.invoke(topicTransactionBuffer);
+        Field processorField = 
TopicTransactionBuffer.class.getDeclaredField("snapshotAbortedTxnProcessor");
+        processorField.setAccessible(true);
+        AbortedTxnProcessor abortedTxnProcessor = (AbortedTxnProcessor) 
processorField.get(topicTransactionBuffer);
+        CompletableFuture<Void> completableFuture = 
abortedTxnProcessor.takeAbortedTxnsSnapshot(
+                topicTransactionBuffer.getMaxReadPosition());
         completableFuture.get();
 
         
doReturn(PositionImpl.LATEST).when(managedLedger).getLastConfirmedEntry();
@@ -1026,9 +1029,15 @@ public class TransactionTest extends TransactionTestBase 
{
                 .getTopic(NAMESPACE1 + 
"/changeMaxReadPositionAndAddAbortTimes" + UUID.randomUUID(), true)
                 .get().get();
         TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
-        Field field = 
TopicTransactionBuffer.class.getDeclaredField("changeMaxReadPositionAndAddAbortTimes");
-        field.setAccessible(true);
-        AtomicLong changeMaxReadPositionAndAddAbortTimes = (AtomicLong) 
field.get(buffer);
+        Field processorField = 
TopicTransactionBuffer.class.getDeclaredField("snapshotAbortedTxnProcessor");
+        processorField.setAccessible(true);
+
+        AbortedTxnProcessor abortedTxnProcessor = (AbortedTxnProcessor) 
processorField.get(buffer);
+        Field changeTimeField = TopicTransactionBuffer
+                
.class.getDeclaredField("changeMaxReadPositionAndAddAbortTimes");
+        changeTimeField.setAccessible(true);
+        AtomicLong changeMaxReadPositionAndAddAbortTimes = (AtomicLong) 
changeTimeField.get(buffer);
+
         Field field1 = 
TopicTransactionBufferState.class.getDeclaredField("state");
         field1.setAccessible(true);
 

Reply via email to