This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d21176609f9 [feat][broker] Segmented transaction buffer snapshot 
segment and index system topic (#16931)
d21176609f9 is described below

commit d21176609f9cfe3e51ea5cff69296667a902fd3c
Author: Xiangying Meng <[email protected]>
AuthorDate: Mon Oct 24 15:06:51 2022 +0800

    [feat][broker] Segmented transaction buffer snapshot segment and index 
system topic (#16931)
    
    Master Issue: https://github.com/apache/pulsar/issues/16913
    ### Motivation
    Implement system topic client for snapshot segment topic and index topic to 
send segment snapshots or indexes.
    The configuration `transactionBufferSegmentedSnapshotEnabled` is used in 
the Transaction Buffer to determine which `AbortedTxnProcessor` is adopted by 
this TB.
    ### Modification
    
    In the new implementation of the Transaction Buffer Snapshot System topic, 
because the system topic that needs to be processed has changed from the 
original one to three with different schemes, we have added generics to the 
TransactionBufferSnapshotBaseSystemTopicClient class and the 
SystemTopicTxnBufferSnapshotService<T> class.
    And Pulsar Service maintains a factory class 
TransactionBufferSnapshotServiceFactory used to obtain 
SystemTopicTxnBufferSnapshotService.
    This way, we can obtain the required System topic client through 
pulsarService to read and send snapshots.
    <img width="1336" alt="image" 
src="https://user-images.githubusercontent.com/55571188/197467173-9028e58a-79cc-4fe4-81e2-c299c568caee.png";>
---
 .../apache/bookkeeper/mledger/AsyncCallbacks.java  |   7 +
 .../bookkeeper/mledger/ManagedLedgerFactory.java   |  11 ++
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  52 +++--
 .../mledger/impl/ReadOnlyManagedLedgerImpl.java    |  14 +-
 .../org/apache/pulsar/broker/PulsarService.java    |  15 +-
 .../SystemTopicBasedTopicPoliciesService.java      |  20 +-
 ...va => SystemTopicTxnBufferSnapshotService.java} |  63 +++---
 .../service/TransactionBufferSnapshotService.java  |  62 ------
 .../TransactionBufferSnapshotServiceFactory.java   |  74 +++++++
 .../NamespaceEventsSystemTopicFactory.java         |  30 +--
 .../pulsar/broker/systopic/SystemTopicClient.java  |  13 +-
 .../systopic/TopicPoliciesSystemTopicClient.java   |  24 +--
 ...sactionBufferSnapshotBaseSystemTopicClient.java | 214 +++++++++++++++++++++
 .../TransactionBufferSystemTopicClient.java        | 208 --------------------
 .../buffer/impl/TopicTransactionBuffer.java        |  16 +-
 .../TopicTransactionBufferRecoverCallBack.java     |   2 +-
 .../{matadata => metadata}/AbortTxnMetadata.java   |   2 +-
 .../TransactionBufferSnapshot.java                 |   2 +-
 .../{matadata => metadata}/package-info.java       |   2 +-
 .../v2/TransactionBufferSnapshotIndex.java}        |  24 ++-
 .../v2/TransactionBufferSnapshotIndexes.java}      |  17 +-
 .../v2/TransactionBufferSnapshotSegment.java}      |  16 +-
 .../v2/TxnIDData.java}                             |  54 ++++--
 .../{matadata => metadata/v2}/package-info.java    |   5 +-
 .../NamespaceEventsSystemTopicServiceTest.java     |   3 +-
 .../TopicTransactionBufferRecoverTest.java         | 204 +++++++++++++++++---
 .../pulsar/broker/transaction/TransactionTest.java |  20 +-
 .../org/apache/pulsar/common/events/EventType.java |  12 +-
 .../pulsar/common/naming/SystemTopicNames.java     |  12 +-
 29 files changed, 732 insertions(+), 466 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
index 395da52b2af..78cca061c78 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Optional;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl;
 
 /**
  * Definition of all the callbacks used for the ManagedLedger asynchronous API.
@@ -46,6 +47,12 @@ public interface AsyncCallbacks {
         void openReadOnlyCursorFailed(ManagedLedgerException exception, Object 
ctx);
     }
 
+    interface OpenReadOnlyManagedLedgerCallback {
+        void openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl 
managedLedger, Object ctx);
+
+        void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, 
Object ctx);
+    }
+
     interface DeleteLedgerCallback {
         void deleteLedgerComplete(Object ctx);
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
index f786d646fb4..e640f65e716 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
@@ -117,6 +117,17 @@ public interface ManagedLedgerFactory {
     void asyncOpenReadOnlyCursor(String managedLedgerName, Position 
startPosition, ManagedLedgerConfig config,
             OpenReadOnlyCursorCallback callback, Object ctx);
 
+    /**
+     * Asynchronous open a Read-only managedLedger.
+     * @param managedLedgerName the unique name that identifies the managed 
ledger
+     * @param callback
+     * @param config the managed ledger configuration.
+     * @param ctx opaque context
+     */
+    void asyncOpenReadOnlyManagedLedger(String managedLedgerName,
+                                
AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback,
+                                ManagedLedgerConfig config, Object ctx);
+
     /**
      * Get the current metadata info for a managed ledger.
      *
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index f0b10ac8dbf..632b56f2a86 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -423,7 +423,29 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
         });
     }
 
+    @Override
+    public void asyncOpenReadOnlyManagedLedger(String managedLedgerName,
+                              AsyncCallbacks.OpenReadOnlyManagedLedgerCallback 
callback,
+                              ManagedLedgerConfig config, Object ctx) {
+        if (closed) {
+            callback.openReadOnlyManagedLedgerFailed(
+                    new 
ManagedLedgerException.ManagedLedgerFactoryClosedException(), ctx);
+        }
+        ReadOnlyManagedLedgerImpl roManagedLedger = new 
ReadOnlyManagedLedgerImpl(this,
+                bookkeeperFactory
+                        .get(new 
EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
+                                
config.getBookKeeperEnsemblePlacementPolicyProperties())),
+                store, config, scheduledExecutor, managedLedgerName);
+        roManagedLedger.initialize().thenRun(() -> {
+            log.info("[{}] Successfully initialize Read-only managed ledger", 
managedLedgerName);
+            callback.openReadOnlyManagedLedgerComplete(roManagedLedger, ctx);
 
+        }).exceptionally(e -> {
+            log.error("[{}] Failed to initialize Read-only managed ledger", 
managedLedgerName, e);
+            callback.openReadOnlyManagedLedgerFailed((ManagedLedgerException) 
e.getCause(), ctx);
+            return null;
+        });
+    }
 
     @Override
     public ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, 
Position startPosition,
@@ -465,28 +487,20 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
             return;
         }
         checkArgument(startPosition instanceof PositionImpl);
-        ReadOnlyManagedLedgerImpl roManagedLedger = new 
ReadOnlyManagedLedgerImpl(this,
-                bookkeeperFactory
-                        .get(new 
EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
-                                
config.getBookKeeperEnsemblePlacementPolicyProperties())),
-                store, config, scheduledExecutor, managedLedgerName);
-
-        roManagedLedger.initializeAndCreateCursor((PositionImpl) startPosition)
-                .thenAccept(roCursor -> 
callback.openReadOnlyCursorComplete(roCursor, ctx))
-                .exceptionally(ex -> {
-            Throwable t = ex;
-            if (t instanceof CompletionException) {
-                t = ex.getCause();
+        AsyncCallbacks.OpenReadOnlyManagedLedgerCallback 
openReadOnlyManagedLedgerCallback =
+                new AsyncCallbacks.OpenReadOnlyManagedLedgerCallback() {
+            @Override
+            public void 
openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl 
readOnlyManagedLedger, Object ctx) {
+                callback.openReadOnlyCursorComplete(readOnlyManagedLedger.
+                        createReadOnlyCursor((PositionImpl) startPosition), 
ctx);
             }
 
-            if (t instanceof ManagedLedgerException) {
-                callback.openReadOnlyCursorFailed((ManagedLedgerException) t, 
ctx);
-            } else {
-                callback.openReadOnlyCursorFailed(new 
ManagedLedgerException(t), ctx);
+            @Override
+            public void openReadOnlyManagedLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
+                callback.openReadOnlyCursorFailed(exception, ctx);
             }
-
-            return null;
-        });
+        };
+        asyncOpenReadOnlyManagedLedger(managedLedgerName, 
openReadOnlyManagedLedgerCallback, config, null);
     }
 
     void close(ManagedLedger ledger) {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
index 214c3afe1bc..8878b2aece0 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
@@ -45,8 +45,8 @@ public class ReadOnlyManagedLedgerImpl extends 
ManagedLedgerImpl {
         super(factory, bookKeeper, store, config, scheduledExecutor, name);
     }
 
-    CompletableFuture<ReadOnlyCursor> initializeAndCreateCursor(PositionImpl 
startPosition) {
-        CompletableFuture<ReadOnlyCursor> future = new CompletableFuture<>();
+    CompletableFuture<Void> initialize() {
+        CompletableFuture<Void> future = new CompletableFuture<>();
 
         // Fetch the list of existing ledgers in the managed ledger
         store.getManagedLedgerInfo(name, false, new 
MetaStoreCallback<ManagedLedgerInfo>() {
@@ -72,7 +72,7 @@ public class ReadOnlyManagedLedgerImpl extends 
ManagedLedgerImpl {
                                             
.setTimestamp(clock.millis()).build();
                                     ledgers.put(lastLedgerId, info);
 
-                                    
future.complete(createReadOnlyCursor(startPosition));
+                                    future.complete(null);
                                 }).exceptionally(ex -> {
                                     if (ex instanceof CompletionException
                                             && ex.getCause() instanceof 
IllegalArgumentException) {
@@ -80,7 +80,7 @@ public class ReadOnlyManagedLedgerImpl extends 
ManagedLedgerImpl {
                                         LedgerInfo info = 
LedgerInfo.newBuilder().setLedgerId(lastLedgerId)
                                                 
.setEntries(0).setSize(0).setTimestamp(clock.millis()).build();
                                         ledgers.put(lastLedgerId, info);
-                                        
future.complete(createReadOnlyCursor(startPosition));
+                                        future.complete(null);
                                     } else {
                                         future.completeExceptionally(new 
ManagedLedgerException(ex));
                                     }
@@ -93,7 +93,7 @@ public class ReadOnlyManagedLedgerImpl extends 
ManagedLedgerImpl {
                                     LedgerInfo info = 
LedgerInfo.newBuilder().setLedgerId(lastLedgerId).setEntries(0)
                                             
.setSize(0).setTimestamp(clock.millis()).build();
                                     ledgers.put(lastLedgerId, info);
-                                    
future.complete(createReadOnlyCursor(startPosition));
+                                    future.complete(null);
                                 } else {
                                     future.completeExceptionally(new 
ManagedLedgerException(ex));
                                 }
@@ -101,7 +101,7 @@ public class ReadOnlyManagedLedgerImpl extends 
ManagedLedgerImpl {
                             });
                 } else {
                     // The read-only managed ledger is ready to use
-                    future.complete(createReadOnlyCursor(startPosition));
+                    future.complete(null);
                 }
             }
 
@@ -118,7 +118,7 @@ public class ReadOnlyManagedLedgerImpl extends 
ManagedLedgerImpl {
         return future;
     }
 
-    private ReadOnlyCursor createReadOnlyCursor(PositionImpl startPosition) {
+    ReadOnlyCursor createReadOnlyCursor(PositionImpl startPosition) {
         if (ledgers.isEmpty()) {
             lastConfirmedEntry = PositionImpl.EARLIEST;
         } else if (ledgers.lastEntry().getValue().getEntries() > 0) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index f6aec263f13..02a4bdcd31d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -98,11 +98,10 @@ import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.rest.Topics;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
-import 
org.apache.pulsar.broker.service.SystemTopicBaseTxnBufferSnapshotService;
 import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.TopicPoliciesService;
-import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
+import 
org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
 import org.apache.pulsar.broker.stats.MetricsGenerator;
@@ -260,8 +259,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     private MetadataStoreExtended localMetadataStore;
     private PulsarMetadataEventSynchronizer localMetadataSynchronizer;
     private CoordinationService coordinationService;
-    private TransactionBufferSnapshotService transactionBufferSnapshotService;
-
+    private TransactionBufferSnapshotServiceFactory 
transactionBufferSnapshotServiceFactory;
     private MetadataStore configurationMetadataStore;
     private PulsarMetadataEventSynchronizer configMetadataSynchronizer;
     private boolean shouldShutdownConfigurationMetadataStore;
@@ -510,9 +508,9 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 adminClient = null;
             }
 
-            if (transactionBufferSnapshotService != null) {
-                transactionBufferSnapshotService.close();
-                transactionBufferSnapshotService = null;
+            if (transactionBufferSnapshotServiceFactory != null) {
+                transactionBufferSnapshotServiceFactory.close();
+                transactionBufferSnapshotServiceFactory = null;
             }
 
             if (transactionBufferClient != null) {
@@ -837,7 +835,8 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 
MLTransactionMetadataStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
                 
MLPendingAckStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
 
-                this.transactionBufferSnapshotService = new 
SystemTopicBaseTxnBufferSnapshotService(getClient());
+                this.transactionBufferSnapshotServiceFactory = new 
TransactionBufferSnapshotServiceFactory(getClient());
+
                 this.transactionTimer =
                         new HashedWheelTimer(new 
DefaultThreadFactory("pulsar-transaction-timer"));
                 transactionBufferClient = 
TransactionBufferClientImpl.create(this, transactionTimer,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 4bd3a0952f0..90c58f5910f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -123,7 +123,8 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
             } else {
                 PulsarEvent event = getPulsarEvent(topicName, actionType, 
policies);
                 CompletableFuture<MessageId> actionFuture =
-                        ActionType.DELETE.equals(actionType) ? 
writer.deleteAsync(event) : writer.writeAsync(event);
+                        ActionType.DELETE.equals(actionType) ? 
writer.deleteAsync(getEventKey(event), event)
+                                : writer.writeAsync(getEventKey(event), event);
                 actionFuture.whenComplete(((messageId, e) -> {
                             if (e != null) {
                                 result.completeExceptionally(e);
@@ -455,7 +456,8 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                     SystemTopicClient<PulsarEvent> systemTopicClient = 
namespaceEventsSystemTopicFactory
                             
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
                     systemTopicClient.newWriterAsync().thenAccept(writer
-                            -> writer.deleteAsync(getPulsarEvent(topicName, 
ActionType.DELETE, null))
+                            -> writer.deleteAsync(getEventKey(topicName),
+                                    getPulsarEvent(topicName, 
ActionType.DELETE, null))
                             .whenComplete((result, e) -> 
writer.closeAsync().whenComplete((res, ex) -> {
                                 if (ex != null) {
                                     log.error("close writer failed ", ex);
@@ -539,6 +541,20 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         });
     }
 
+    public static String getEventKey(PulsarEvent event) {
+        return TopicName.get(event.getTopicPoliciesEvent().getDomain(),
+                event.getTopicPoliciesEvent().getTenant(),
+                event.getTopicPoliciesEvent().getNamespace(),
+                event.getTopicPoliciesEvent().getTopic()).toString();
+    }
+
+    public static String getEventKey(TopicName topicName) {
+        return TopicName.get(topicName.getDomain().toString(),
+                topicName.getTenant(),
+                topicName.getNamespace(),
+                
TopicName.get(topicName.getPartitionedTopicName()).getLocalName()).toString();
+    }
+
     @VisibleForTesting
     long getPoliciesCacheSize() {
         return policiesCache.size();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBaseTxnBufferSnapshotService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
similarity index 61%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBaseTxnBufferSnapshotService.java
rename to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
index 719d492a524..9dc1fa1b8a3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBaseTxnBufferSnapshotService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
@@ -23,64 +23,63 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer;
-import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient;
-import 
org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
 import org.apache.pulsar.client.api.PulsarClient;
-import 
org.apache.pulsar.client.api.PulsarClientException.InvalidTopicNameException;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 
-public class SystemTopicBaseTxnBufferSnapshotService implements 
TransactionBufferSnapshotService {
+public class SystemTopicTxnBufferSnapshotService<T> {
 
-    private final Map<TopicName, SystemTopicClient<TransactionBufferSnapshot>> 
clients;
+    protected final Map<TopicName, SystemTopicClient<T>> clients;
+    protected final NamespaceEventsSystemTopicFactory 
namespaceEventsSystemTopicFactory;
 
-    private final NamespaceEventsSystemTopicFactory 
namespaceEventsSystemTopicFactory;
+    protected final Class<T> schemaType;
+    protected final EventType systemTopicType;
 
-    public SystemTopicBaseTxnBufferSnapshotService(PulsarClient client) {
+    public SystemTopicTxnBufferSnapshotService(PulsarClient client, EventType 
systemTopicType,
+                                               Class<T> schemaType) {
         this.namespaceEventsSystemTopicFactory = new 
NamespaceEventsSystemTopicFactory(client);
+        this.systemTopicType = systemTopicType;
+        this.schemaType = schemaType;
         this.clients = new ConcurrentHashMap<>();
     }
 
-    @Override
-    public CompletableFuture<Writer<TransactionBufferSnapshot>> 
createWriter(TopicName topicName) {
+    public CompletableFuture<SystemTopicClient.Writer<T>> 
createWriter(TopicName topicName) {
         return 
getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newWriterAsync);
     }
 
-    private CompletableFuture<SystemTopicClient<TransactionBufferSnapshot>> 
getTransactionBufferSystemTopicClient(
-            TopicName topicName) {
-        TopicName systemTopicName = NamespaceEventsSystemTopicFactory
-                .getSystemTopicName(topicName.getNamespaceObject(), 
EventType.TRANSACTION_BUFFER_SNAPSHOT);
-        if (systemTopicName == null) {
-            return FutureUtil.failedFuture(
-                    new InvalidTopicNameException("Can't create 
SystemTopicBaseTxnBufferSnapshotService, "
-                            + "because the topicName is null!"));
-        }
-        return 
CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName,
-                (v) -> namespaceEventsSystemTopicFactory
-                        
.createTransactionBufferSystemTopicClient(topicName.getNamespaceObject(), 
this)));
-    }
-
-    @Override
-    public CompletableFuture<Reader<TransactionBufferSnapshot>> 
createReader(TopicName topicName) {
+    public CompletableFuture<SystemTopicClient.Reader<T>> 
createReader(TopicName topicName) {
         return 
getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newReaderAsync);
     }
 
-    @Override
-    public void removeClient(TopicName topicName,
-                                          TransactionBufferSystemTopicClient 
transactionBufferSystemTopicClient) {
+    public void removeClient(TopicName topicName, SystemTopicClientBase<T> 
transactionBufferSystemTopicClient) {
         if (transactionBufferSystemTopicClient.getReaders().size() == 0
                 && transactionBufferSystemTopicClient.getWriters().size() == 
0) {
             clients.remove(topicName);
         }
     }
 
-    @Override
+    protected CompletableFuture<SystemTopicClient<T>> 
getTransactionBufferSystemTopicClient(TopicName topicName) {
+        TopicName systemTopicName = NamespaceEventsSystemTopicFactory
+                .getSystemTopicName(topicName.getNamespaceObject(), 
systemTopicType);
+        if (systemTopicName == null) {
+            return FutureUtil.failedFuture(
+                    new PulsarClientException
+                            .InvalidTopicNameException("Can't create 
SystemTopicBaseTxnBufferSnapshotIndexService, "
+                            + "because the topicName is null!"));
+        }
+        return 
CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName,
+                (v) -> namespaceEventsSystemTopicFactory
+                        
.createTransactionBufferSystemTopicClient(topicName.getNamespaceObject(),
+                                this, schemaType)));
+    }
+
     public void close() throws Exception {
-        for (Map.Entry<TopicName, 
SystemTopicClient<TransactionBufferSnapshot>> entry : clients.entrySet()) {
+        for (Map.Entry<TopicName, SystemTopicClient<T>> entry : 
clients.entrySet()) {
             entry.getValue().close();
         }
     }
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotService.java
deleted file mode 100644
index b090e8fe46a..00000000000
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotService.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service;
-
-import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer;
-import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient;
-import 
org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
-import org.apache.pulsar.common.naming.TopicName;
-
-public interface TransactionBufferSnapshotService {
-
-    /**
-     * Create a transaction buffer snapshot writer.
-     *
-     * @param topicName {@link TopicName} the topic name
-     *
-     * @return {@link CompletableFuture<Writer>} return the future of writer
-     */
-    CompletableFuture<Writer<TransactionBufferSnapshot>> 
createWriter(TopicName topicName);
-
-    /**
-     * Create a transaction buffer snapshot reader.
-     *
-     * @param topicName {@link TopicName} the topic name
-     *
-     * @return {@link CompletableFuture<Writer>} return the future of reader
-     */
-    CompletableFuture<Reader<TransactionBufferSnapshot>> 
createReader(TopicName topicName);
-
-    /**
-     * Remove a topic client from cache.
-     *
-     * @param topicName {@link TopicName} the topic name
-     * @param transactionBufferSystemTopicClient {@link 
TransactionBufferSystemTopicClient} the topic client
-     *
-     */
-    void removeClient(TopicName topicName, TransactionBufferSystemTopicClient 
transactionBufferSystemTopicClient);
-
-    /**
-     * Close transaction buffer snapshot service.
-     */
-    void close() throws Exception;
-
-}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java
new file mode 100644
index 00000000000..2220c203237
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service;
+
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.events.EventType;
+
+public class TransactionBufferSnapshotServiceFactory {
+
+    private SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot> 
txnBufferSnapshotService;
+
+    private 
SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotSegment>
+            txnBufferSnapshotSegmentService;
+
+    private 
SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes> 
txnBufferSnapshotIndexService;
+
+    public TransactionBufferSnapshotServiceFactory(PulsarClient pulsarClient) {
+        this.txnBufferSnapshotSegmentService = new 
SystemTopicTxnBufferSnapshotService<>(pulsarClient,
+                EventType.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS,
+                TransactionBufferSnapshotSegment.class);
+        this.txnBufferSnapshotIndexService = new 
SystemTopicTxnBufferSnapshotService<>(pulsarClient,
+                EventType.TRANSACTION_BUFFER_SNAPSHOT_INDEXES, 
TransactionBufferSnapshotIndexes.class);
+        this.txnBufferSnapshotService = new 
SystemTopicTxnBufferSnapshotService<>(pulsarClient,
+                EventType.TRANSACTION_BUFFER_SNAPSHOT, 
TransactionBufferSnapshot.class);
+    }
+
+    public 
SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes> 
getTxnBufferSnapshotIndexService() {
+        return this.txnBufferSnapshotIndexService;
+    }
+
+    public 
SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotSegment>
+    getTxnBufferSnapshotSegmentService() {
+        return this.txnBufferSnapshotSegmentService;
+    }
+
+    public SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot> 
getTxnBufferSnapshotService() {
+        return this.txnBufferSnapshotService;
+    }
+
+    public void close() throws Exception {
+        if (this.txnBufferSnapshotIndexService != null) {
+            this.txnBufferSnapshotIndexService.close();
+            this.txnBufferSnapshotIndexService = null;
+        }
+        if (this.txnBufferSnapshotSegmentService != null) {
+            this.txnBufferSnapshotSegmentService.close();
+            this.txnBufferSnapshotSegmentService = null;
+        }
+        if (this.txnBufferSnapshotService != null) {
+            this.txnBufferSnapshotService.close();
+            this.txnBufferSnapshotService = null;
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
index 9e162f741b2..c86ad70f6fc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.systopic;
 
-import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
+import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -43,25 +43,27 @@ public class NamespaceEventsSystemTopicFactory {
         return new TopicPoliciesSystemTopicClient(client, topicName);
     }
 
-    public TransactionBufferSystemTopicClient 
createTransactionBufferSystemTopicClient(NamespaceName namespaceName,
-                                                   
TransactionBufferSnapshotService transactionBufferSnapshotService) {
+    public <T> TransactionBufferSnapshotBaseSystemTopicClient<T> 
createTransactionBufferSystemTopicClient(
+            NamespaceName namespaceName, SystemTopicTxnBufferSnapshotService<T>
+            systemTopicTxnBufferSnapshotService, Class<T> schemaType) {
         TopicName topicName = TopicName.get(TopicDomain.persistent.value(), 
namespaceName,
                 SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
         log.info("Create transaction buffer snapshot client, topicName : {}", 
topicName.toString());
-        return new TransactionBufferSystemTopicClient(client, topicName, 
transactionBufferSnapshotService);
+        return new TransactionBufferSnapshotBaseSystemTopicClient(client, 
topicName,
+                systemTopicTxnBufferSnapshotService, schemaType);
     }
 
     public static TopicName getSystemTopicName(NamespaceName namespaceName, 
EventType eventType) {
-        switch (eventType) {
-            case TOPIC_POLICY:
-                return TopicName.get(TopicDomain.persistent.value(), 
namespaceName,
-                        SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
-            case TRANSACTION_BUFFER_SNAPSHOT:
-                return TopicName.get(TopicDomain.persistent.value(), 
namespaceName,
-                        SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
-            default:
-                return null;
-        }
+        return switch (eventType) {
+            case TOPIC_POLICY -> TopicName.get(TopicDomain.persistent.value(), 
namespaceName,
+                    SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+            case TRANSACTION_BUFFER_SNAPSHOT -> 
TopicName.get(TopicDomain.persistent.value(), namespaceName,
+                    SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+            case TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS -> 
TopicName.get(TopicDomain.persistent.value(), namespaceName,
+                    SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+            case TRANSACTION_BUFFER_SNAPSHOT_INDEXES -> 
TopicName.get(TopicDomain.persistent.value(), namespaceName,
+                    SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES);
+        };
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(NamespaceEventsSystemTopicFactory.class);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
index 2bc740a41d4..11dfeff2008 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
@@ -87,37 +87,42 @@ public interface SystemTopicClient<T> {
      * Writer for system topic.
      */
     interface Writer<T> {
+
         /**
          * Write event to the system topic.
+         * @param key the key of the event
          * @param t pulsar event
          * @return message id
          * @throws PulsarClientException exception while write event cause
          */
-        MessageId write(T t) throws PulsarClientException;
+        MessageId write(String key, T t) throws PulsarClientException;
 
         /**
          * Async write event to the system topic.
+         * @param key the key of the event
          * @param t pulsar event
          * @return message id future
          */
-        CompletableFuture<MessageId> writeAsync(T t);
+        CompletableFuture<MessageId> writeAsync(String key, T t);
 
         /**
          * Delete event in the system topic.
+         * @param key the key of the event
          * @param t pulsar event
          * @return message id
          * @throws PulsarClientException exception while write event cause
          */
-        default MessageId delete(T t) throws PulsarClientException {
+        default MessageId delete(String key, T t) throws PulsarClientException 
{
             throw new UnsupportedOperationException("Unsupported operation");
         }
 
         /**
          * Async delete event in the system topic.
+         * @param key the key of the event
          * @param t pulsar event
          * @return message id future
          */
-        default CompletableFuture<MessageId> deleteAsync(T t) {
+        default CompletableFuture<MessageId> deleteAsync(String key, T t) {
             throw new UnsupportedOperationException("Unsupported operation");
         }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
index 4ad2137d80b..1efa47ff81c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
@@ -85,41 +85,35 @@ public class TopicPoliciesSystemTopicClient extends 
SystemTopicClientBase<Pulsar
         }
 
         @Override
-        public MessageId write(PulsarEvent event) throws PulsarClientException 
{
-            TypedMessageBuilder<PulsarEvent> builder = 
producer.newMessage().key(getEventKey(event)).value(event);
+        public MessageId write(String key, PulsarEvent event) throws 
PulsarClientException {
+            TypedMessageBuilder<PulsarEvent> builder = 
producer.newMessage().key(key).value(event);
             setReplicateCluster(event, builder);
             return builder.send();
         }
 
         @Override
-        public CompletableFuture<MessageId> writeAsync(PulsarEvent event) {
-            TypedMessageBuilder<PulsarEvent> builder = 
producer.newMessage().key(getEventKey(event)).value(event);
+        public CompletableFuture<MessageId> writeAsync(String key, PulsarEvent 
event) {
+            TypedMessageBuilder<PulsarEvent> builder = 
producer.newMessage().key(key).value(event);
             setReplicateCluster(event, builder);
             return builder.sendAsync();
         }
 
         @Override
-        public MessageId delete(PulsarEvent event) throws 
PulsarClientException {
-            validateActionType(event);
-            TypedMessageBuilder<PulsarEvent> builder = 
producer.newMessage().key(getEventKey(event)).value(null);
+        public MessageId delete(String key, PulsarEvent event) throws 
PulsarClientException {
+            TypedMessageBuilder<PulsarEvent> builder = 
producer.newMessage().key(key).value(null);
             setReplicateCluster(event, builder);
             return builder.send();
         }
 
         @Override
-        public CompletableFuture<MessageId> deleteAsync(PulsarEvent event) {
+        public CompletableFuture<MessageId> deleteAsync(String key, 
PulsarEvent event) {
             validateActionType(event);
-            TypedMessageBuilder<PulsarEvent> builder = 
producer.newMessage().key(getEventKey(event)).value(null);
+            TypedMessageBuilder<PulsarEvent> builder = 
producer.newMessage().key(key).value(null);
             setReplicateCluster(event, builder);
             return builder.sendAsync();
         }
 
-        private String getEventKey(PulsarEvent event) {
-            return TopicName.get(event.getTopicPoliciesEvent().getDomain(),
-                event.getTopicPoliciesEvent().getTenant(),
-                event.getTopicPoliciesEvent().getNamespace(),
-                event.getTopicPoliciesEvent().getTopic()).toString();
-        }
+
 
         @Override
         public void close() throws IOException {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java
new file mode 100644
index 00000000000..11908512d4f
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.systopic;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+
+@Slf4j
+public class  TransactionBufferSnapshotBaseSystemTopicClient<T> extends 
SystemTopicClientBase<T> {
+
+    protected final SystemTopicTxnBufferSnapshotService<T> 
systemTopicTxnBufferSnapshotService;
+    protected final Class<T> schemaType;
+
+    public TransactionBufferSnapshotBaseSystemTopicClient(PulsarClient client,
+                                                          TopicName topicName,
+                                                          
SystemTopicTxnBufferSnapshotService<T>
+                                                                  
systemTopicTxnBufferSnapshotService,
+                                                          Class<T> schemaType) 
{
+        super(client, topicName);
+        this.systemTopicTxnBufferSnapshotService = 
systemTopicTxnBufferSnapshotService;
+        this.schemaType = schemaType;
+    }
+
+    protected void removeWriter(Writer<T> writer) {
+        writers.remove(writer);
+        this.systemTopicTxnBufferSnapshotService.removeClient(topicName, this);
+    }
+
+    protected void removeReader(Reader<T> reader) {
+        readers.remove(reader);
+        this.systemTopicTxnBufferSnapshotService.removeClient(topicName, this);
+    }
+
+    protected static class TransactionBufferSnapshotWriter<T> implements 
Writer<T> {
+
+        protected final Producer<T> producer;
+        protected final TransactionBufferSnapshotBaseSystemTopicClient<T>
+                transactionBufferSnapshotBaseSystemTopicClient;
+
+        protected TransactionBufferSnapshotWriter(Producer<T> producer,
+                                                  
TransactionBufferSnapshotBaseSystemTopicClient<T>
+                                                    
transactionBufferSnapshotBaseSystemTopicClient) {
+            this.producer = producer;
+            this.transactionBufferSnapshotBaseSystemTopicClient = 
transactionBufferSnapshotBaseSystemTopicClient;
+        }
+
+        @Override
+        public MessageId write(String key, T t)
+                throws PulsarClientException {
+            return producer.newMessage().key(key)
+                    .value(t).send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> writeAsync(String key, T t) {
+            return producer.newMessage()
+                    .key(key)
+                    .value(t).sendAsync();
+        }
+
+        @Override
+        public MessageId delete(String key, T t)
+                throws PulsarClientException {
+            return producer.newMessage()
+                    .key(key)
+                    .value(null)
+                    .send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> deleteAsync(String key, T t) {
+            return producer.newMessage()
+                    .key(key)
+                    .value(null)
+                    .sendAsync();
+        }
+
+        @Override
+        public void close() throws IOException {
+            this.closeAsync().join();
+        }
+
+        @Override
+        public CompletableFuture<Void> closeAsync() {
+            CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
+            producer.closeAsync().whenComplete((v, e) -> {
+                // if close fail, also need remove the producer
+                
transactionBufferSnapshotBaseSystemTopicClient.removeWriter(this);
+                if (e != null) {
+                    completableFuture.completeExceptionally(e);
+                    return;
+                }
+                completableFuture.complete(null);
+            });
+            return completableFuture;
+        }
+
+        @Override
+        public SystemTopicClient<T> getSystemTopicClient() {
+            return transactionBufferSnapshotBaseSystemTopicClient;
+        }
+    }
+
+    protected static class TransactionBufferSnapshotReader<T> implements 
Reader<T> {
+
+        private final org.apache.pulsar.client.api.Reader<T> reader;
+        private final TransactionBufferSnapshotBaseSystemTopicClient<T> 
transactionBufferSnapshotBaseSystemTopicClient;
+
+        protected TransactionBufferSnapshotReader(
+                org.apache.pulsar.client.api.Reader<T> reader,
+                TransactionBufferSnapshotBaseSystemTopicClient<T> 
transactionBufferSnapshotBaseSystemTopicClient) {
+            this.reader = reader;
+            this.transactionBufferSnapshotBaseSystemTopicClient = 
transactionBufferSnapshotBaseSystemTopicClient;
+        }
+
+        @Override
+        public Message<T> readNext() throws PulsarClientException {
+            return reader.readNext();
+        }
+
+        @Override
+        public CompletableFuture<Message<T>> readNextAsync() {
+            return reader.readNextAsync();
+        }
+
+        @Override
+        public boolean hasMoreEvents() throws PulsarClientException {
+            return reader.hasMessageAvailable();
+        }
+
+        @Override
+        public CompletableFuture<Boolean> hasMoreEventsAsync() {
+            return reader.hasMessageAvailableAsync();
+        }
+
+        @Override
+        public void close() throws IOException {
+            this.closeAsync().join();
+        }
+
+        @Override
+        public CompletableFuture<Void> closeAsync() {
+            CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
+            reader.closeAsync().whenComplete((v, e) -> {
+                // if close fail, also need remove the reader
+                
transactionBufferSnapshotBaseSystemTopicClient.removeReader(this);
+                if (e != null) {
+                    completableFuture.completeExceptionally(e);
+                    return;
+                }
+                completableFuture.complete(null);
+            });
+            return completableFuture;
+        }
+
+        @Override
+        public SystemTopicClient<T> getSystemTopic() {
+            return transactionBufferSnapshotBaseSystemTopicClient;
+        }
+    }
+
+    @Override
+    protected CompletableFuture<Writer<T>> newWriterAsyncInternal() {
+        return client.newProducer(Schema.AVRO(schemaType))
+                .topic(topicName.toString())
+                .createAsync().thenApply(producer -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] A new {} writer is created", 
topicName, schemaType.getName());
+                    }
+                    return  new TransactionBufferSnapshotWriter<>(producer, 
this);
+                });
+    }
+
+    @Override
+    protected CompletableFuture<Reader<T>> newReaderAsyncInternal() {
+        return client.newReader(Schema.AVRO(schemaType))
+                .topic(topicName.toString())
+                .startMessageId(MessageId.earliest)
+                .readCompacted(true)
+                .createAsync()
+                .thenApply(reader -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] A new {} reader is created", 
topicName, schemaType.getName());
+                    }
+                    return new TransactionBufferSnapshotReader<>(reader, this);
+                });
+    }
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java
deleted file mode 100644
index aaab858ab1e..00000000000
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pulsar.broker.systopic;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
-import 
org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.common.naming.TopicName;
-
-@Slf4j
-public class TransactionBufferSystemTopicClient extends 
SystemTopicClientBase<TransactionBufferSnapshot> {
-    private TransactionBufferSnapshotService transactionBufferSnapshotService;
-
-    public TransactionBufferSystemTopicClient(PulsarClient client, TopicName 
topicName,
-                                              TransactionBufferSnapshotService 
transactionBufferSnapshotService) {
-        super(client, topicName);
-        this.transactionBufferSnapshotService = 
transactionBufferSnapshotService;
-    }
-
-    @Override
-    protected CompletableFuture<Writer<TransactionBufferSnapshot>> 
newWriterAsyncInternal() {
-        return client.newProducer(Schema.AVRO(TransactionBufferSnapshot.class))
-                .topic(topicName.toString())
-                .createAsync().thenCompose(producer -> {
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] A new transactionBufferSnapshot writer 
is created", topicName);
-                    }
-                    return CompletableFuture.completedFuture(
-                            new TransactionBufferSnapshotWriter(producer, 
this));
-                });
-    }
-
-    @Override
-    protected CompletableFuture<Reader<TransactionBufferSnapshot>> 
newReaderAsyncInternal() {
-        return client.newReader(Schema.AVRO(TransactionBufferSnapshot.class))
-                .topic(topicName.toString())
-                .startMessageId(MessageId.earliest)
-                .readCompacted(true)
-                .createAsync()
-                .thenCompose(reader -> {
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] A new transactionBufferSnapshot buffer 
reader is created", topicName);
-                    }
-                    return CompletableFuture.completedFuture(
-                            new TransactionBufferSnapshotReader(reader, this));
-                });
-    }
-
-    protected void removeWriter(TransactionBufferSnapshotWriter writer) {
-        writers.remove(writer);
-        this.transactionBufferSnapshotService.removeClient(topicName, this);
-    }
-
-    protected void removeReader(TransactionBufferSnapshotReader reader) {
-        readers.remove(reader);
-        this.transactionBufferSnapshotService.removeClient(topicName, this);
-    }
-
-    private static class TransactionBufferSnapshotWriter implements 
Writer<TransactionBufferSnapshot> {
-
-        private final Producer<TransactionBufferSnapshot> producer;
-        private final TransactionBufferSystemTopicClient 
transactionBufferSystemTopicClient;
-
-        private 
TransactionBufferSnapshotWriter(Producer<TransactionBufferSnapshot> producer,
-                                                
TransactionBufferSystemTopicClient transactionBufferSystemTopicClient) {
-            this.producer = producer;
-            this.transactionBufferSystemTopicClient = 
transactionBufferSystemTopicClient;
-        }
-
-        @Override
-        public MessageId write(TransactionBufferSnapshot 
transactionBufferSnapshot) throws PulsarClientException {
-            return 
producer.newMessage().key(transactionBufferSnapshot.getTopicName())
-                    .value(transactionBufferSnapshot).send();
-        }
-
-        @Override
-        public CompletableFuture<MessageId> 
writeAsync(TransactionBufferSnapshot transactionBufferSnapshot) {
-            return 
producer.newMessage().key(transactionBufferSnapshot.getTopicName())
-                    .value(transactionBufferSnapshot).sendAsync();
-        }
-
-        @Override
-        public MessageId delete(TransactionBufferSnapshot 
transactionBufferSnapshot) throws PulsarClientException {
-            return producer.newMessage()
-                    .key(transactionBufferSnapshot.getTopicName())
-                    .value(null)
-                    .send();
-        }
-
-        @Override
-        public CompletableFuture<MessageId> 
deleteAsync(TransactionBufferSnapshot transactionBufferSnapshot) {
-            return producer.newMessage()
-                    .key(transactionBufferSnapshot.getTopicName())
-                    .value(null)
-                    .sendAsync();
-        }
-
-        @Override
-        public void close() throws IOException {
-            this.producer.close();
-            transactionBufferSystemTopicClient.removeWriter(this);
-        }
-
-        @Override
-        public CompletableFuture<Void> closeAsync() {
-            CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-            producer.closeAsync().whenComplete((v, e) -> {
-                // if close fail, also need remove the producer
-                transactionBufferSystemTopicClient.removeWriter(this);
-                if (e != null) {
-                    completableFuture.completeExceptionally(e);
-                    return;
-                }
-                completableFuture.complete(null);
-            });
-            return completableFuture;
-        }
-
-        @Override
-        public SystemTopicClient<TransactionBufferSnapshot> 
getSystemTopicClient() {
-            return transactionBufferSystemTopicClient;
-        }
-    }
-
-    private static class TransactionBufferSnapshotReader implements 
Reader<TransactionBufferSnapshot> {
-
-        private final 
org.apache.pulsar.client.api.Reader<TransactionBufferSnapshot> reader;
-        private final TransactionBufferSystemTopicClient 
transactionBufferSystemTopicClient;
-
-        private 
TransactionBufferSnapshotReader(org.apache.pulsar.client.api.Reader<TransactionBufferSnapshot>
 reader,
-                                                
TransactionBufferSystemTopicClient transactionBufferSystemTopicClient) {
-            this.reader = reader;
-            this.transactionBufferSystemTopicClient = 
transactionBufferSystemTopicClient;
-        }
-
-        @Override
-        public Message<TransactionBufferSnapshot> readNext() throws 
PulsarClientException {
-            return reader.readNext();
-        }
-
-        @Override
-        public CompletableFuture<Message<TransactionBufferSnapshot>> 
readNextAsync() {
-            return reader.readNextAsync();
-        }
-
-        @Override
-        public boolean hasMoreEvents() throws PulsarClientException {
-            return reader.hasMessageAvailable();
-        }
-
-        @Override
-        public CompletableFuture<Boolean> hasMoreEventsAsync() {
-            return reader.hasMessageAvailableAsync();
-        }
-
-        @Override
-        public void close() throws IOException {
-            this.reader.close();
-            transactionBufferSystemTopicClient.removeReader(this);
-        }
-
-        @Override
-        public CompletableFuture<Void> closeAsync() {
-            CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-            reader.closeAsync().whenComplete((v, e) -> {
-                // if close fail, also need remove the reader
-                transactionBufferSystemTopicClient.removeReader(this);
-                if (e != null) {
-                    completableFuture.completeExceptionally(e);
-                    return;
-                }
-                completableFuture.complete(null);
-            });
-            return completableFuture;
-        }
-
-        @Override
-        public SystemTopicClient<TransactionBufferSnapshot> getSystemTopic() {
-            return transactionBufferSystemTopicClient;
-        }
-    }
-}
-
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index ad778137001..1245c7d8129 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -48,8 +48,8 @@ import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
 import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
-import org.apache.pulsar.broker.transaction.buffer.matadata.AbortTxnMetadata;
-import 
org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
@@ -117,7 +117,8 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
         super(State.None);
         this.topic = topic;
         this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar()
-                
.getTransactionBufferSnapshotService().createWriter(TopicName.get(topic.getName()));
+                .getTransactionBufferSnapshotServiceFactory()
+                
.getTxnBufferSnapshotService().createWriter(TopicName.get(topic.getName()));
         this.timer = 
topic.getBrokerService().getPulsar().getTransactionTimer();
         this.takeSnapshotIntervalNumber = topic.getBrokerService().getPulsar()
                 
.getConfiguration().getTransactionBufferSnapshotMaxTransactionCount();
@@ -484,7 +485,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                 });
                 snapshot.setAborts(list);
             }
-            return writer.writeAsync(snapshot).thenAccept(messageId-> {
+            return writer.writeAsync(snapshot.getTopicName(), 
snapshot).thenAccept(messageId-> {
                 this.lastSnapshotTimestamps = System.currentTimeMillis();
                 if (log.isDebugEnabled()) {
                     log.debug("[{}]Transaction buffer take snapshot success! "
@@ -532,7 +533,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
         return this.takeSnapshotWriter.thenCompose(writer -> {
             TransactionBufferSnapshot snapshot = new 
TransactionBufferSnapshot();
             snapshot.setTopicName(topic.getName());
-            return writer.deleteAsync(snapshot);
+            return writer.deleteAsync(snapshot.getTopicName(), snapshot);
         }).thenCompose(__ -> CompletableFuture.completedFuture(null));
     }
 
@@ -645,8 +646,9 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                             this, topic.getName());
                     return;
                 }
-                
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotService()
-                        
.createReader(TopicName.get(topic.getName())).thenAcceptAsync(reader -> {
+                
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
+                        
.getTxnBufferSnapshotService().createReader(TopicName.get(topic.getName()))
+                        .thenAcceptAsync(reader -> {
                             try {
                                 boolean hasSnapshot = false;
                                 while (reader.hasMoreEvents()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
index 87b8e930a27..d229fbb8f5d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.broker.transaction.buffer.impl;
 
 import org.apache.bookkeeper.mledger.Entry;
-import 
org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
 
 public interface TopicTransactionBufferRecoverCallBack {
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/AbortTxnMetadata.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/AbortTxnMetadata.java
similarity index 94%
copy from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/AbortTxnMetadata.java
copy to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/AbortTxnMetadata.java
index 5e532d6bff7..7e997111a04 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/AbortTxnMetadata.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/AbortTxnMetadata.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.transaction.buffer.matadata;
+package org.apache.pulsar.broker.transaction.buffer.metadata;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/TransactionBufferSnapshot.java
similarity index 95%
copy from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java
copy to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/TransactionBufferSnapshot.java
index 59b851e7397..dbe15e81854 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/TransactionBufferSnapshot.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.transaction.buffer.matadata;
+package org.apache.pulsar.broker.transaction.buffer.metadata;
 
 import java.util.List;
 import lombok.AllArgsConstructor;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/package-info.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/package-info.java
similarity index 93%
copy from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/package-info.java
copy to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/package-info.java
index f32853a7d86..74688c857c9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/package-info.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/package-info.java
@@ -19,4 +19,4 @@
 /**
  * The transaction buffer snapshot metadata.
  */
-package org.apache.pulsar.broker.transaction.buffer.matadata;
+package org.apache.pulsar.broker.transaction.buffer.metadata;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/AbortTxnMetadata.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndex.java
similarity index 71%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/AbortTxnMetadata.java
rename to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndex.java
index 5e532d6bff7..bc4101e1c92 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/AbortTxnMetadata.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndex.java
@@ -16,23 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.transaction.buffer.matadata;
+package org.apache.pulsar.broker.transaction.buffer.metadata.v2;
 
 import lombok.AllArgsConstructor;
-import lombok.Getter;
+import lombok.Builder;
+import lombok.Data;
 import lombok.NoArgsConstructor;
-import lombok.Setter;
 
-/**
- * Abort txn metadata.
- */
+@Builder
+@Data
 @AllArgsConstructor
 @NoArgsConstructor
-@Getter
-@Setter
-public class AbortTxnMetadata {
-    long txnIdMostBits;
-    long txnIdLeastBits;
-    long ledgerId;
-    long entryId;
+public class TransactionBufferSnapshotIndex {
+    public long sequenceID;
+    public long maxReadPositionLedgerID;
+    public long maxReadPositionEntryID;
+    public long persistentPositionLedgerID;
+    public long persistentPositionEntryID;
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexes.java
similarity index 78%
copy from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java
copy to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexes.java
index 59b851e7397..28b2b05a496 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexes.java
@@ -16,24 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.transaction.buffer.matadata;
+package org.apache.pulsar.broker.transaction.buffer.metadata.v2;
 
 import java.util.List;
 import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
 
-/**
- * Transaction buffer snapshot metadata.
- */
 @AllArgsConstructor
 @NoArgsConstructor
 @Getter
 @Setter
-public class TransactionBufferSnapshot {
+@Builder
+public class TransactionBufferSnapshotIndexes {
     private String topicName;
-    private long maxReadPositionLedgerId;
-    private long maxReadPositionEntryId;
-    private List<AbortTxnMetadata> aborts;
+
+    private List<TransactionBufferSnapshotIndex> indexList;
+
+    private TransactionBufferSnapshotSegment snapshot;
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotSegment.java
similarity index 80%
copy from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java
copy to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotSegment.java
index 59b851e7397..478ec53ba29 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotSegment.java
@@ -16,24 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.transaction.buffer.matadata;
+package org.apache.pulsar.broker.transaction.buffer.metadata.v2;
 
 import java.util.List;
 import lombok.AllArgsConstructor;
-import lombok.Getter;
+import lombok.Data;
 import lombok.NoArgsConstructor;
-import lombok.Setter;
 
-/**
- * Transaction buffer snapshot metadata.
- */
+@Data
 @AllArgsConstructor
 @NoArgsConstructor
-@Getter
-@Setter
-public class TransactionBufferSnapshot {
+public class TransactionBufferSnapshotSegment {
     private String topicName;
+    private long sequenceId;
     private long maxReadPositionLedgerId;
     private long maxReadPositionEntryId;
-    private List<AbortTxnMetadata> aborts;
+    private List<TxnIDData> aborts;
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TxnIDData.java
similarity index 51%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java
rename to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TxnIDData.java
index 59b851e7397..8e565c017ea 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TxnIDData.java
@@ -16,24 +16,48 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.transaction.buffer.matadata;
+package org.apache.pulsar.broker.transaction.buffer.metadata.v2;
 
-import java.util.List;
+import java.util.Objects;
 import lombok.AllArgsConstructor;
-import lombok.Getter;
+import lombok.Data;
 import lombok.NoArgsConstructor;
-import lombok.Setter;
 
-/**
- * Transaction buffer snapshot metadata.
- */
-@AllArgsConstructor
+@Data
 @NoArgsConstructor
-@Getter
-@Setter
-public class TransactionBufferSnapshot {
-    private String topicName;
-    private long maxReadPositionLedgerId;
-    private long maxReadPositionEntryId;
-    private List<AbortTxnMetadata> aborts;
+@AllArgsConstructor
+public class TxnIDData {
+    /*
+     * The most significant 64 bits of this TxnID.
+     *
+     * @serial
+     */
+    private long mostSigBits;
+
+    /*
+     * The least significant 64 bits of this TxnID.
+     *
+     * @serial
+     */
+    private long leastSigBits;
+
+    @Override
+    public String toString() {
+        return "(" + mostSigBits + "," + leastSigBits + ")";
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(mostSigBits, leastSigBits);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof TxnIDData other) {
+            return Objects.equals(mostSigBits, other.mostSigBits)
+                    && Objects.equals(leastSigBits, other.leastSigBits);
+        }
+
+        return false;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/package-info.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/package-info.java
similarity index 87%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/package-info.java
rename to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/package-info.java
index f32853a7d86..02d6e7a716d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/package-info.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/package-info.java
@@ -16,7 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/**
- * The transaction buffer snapshot metadata.
- */
-package org.apache.pulsar.broker.transaction.buffer.matadata;
+package org.apache.pulsar.broker.transaction.buffer.metadata.v2;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
index 91db232d2aa..a0d7801f2fd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.systopic;
 
+import static 
org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.getEventKey;
 import static org.mockito.Mockito.mock;
 import com.google.common.collect.Sets;
 import java.util.HashSet;
@@ -121,7 +122,7 @@ public class NamespaceEventsSystemTopicServiceTest extends 
MockedPulsarServiceBa
                 .policies(policies)
                 .build())
             .build();
-        systemTopicClientForNamespace1.newWriter().write(event);
+        systemTopicClientForNamespace1.newWriter().write(getEventKey(event), 
event);
         SystemTopicClient.Reader reader = 
systemTopicClientForNamespace1.newReader();
         Message<PulsarEvent> received = reader.readNext();
         log.info("Receive pulsar event from system topic : {}", 
received.getValue());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index c0afdbee487..c1f6ff16e77 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -28,9 +28,14 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.NavigableMap;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -38,21 +43,30 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
 import org.apache.pulsar.broker.service.Topic;
-import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
+import 
org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
-import 
org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
+import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -66,9 +80,11 @@ import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
@@ -82,6 +98,8 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
 
     private static final String RECOVER_COMMIT = NAMESPACE1 + 
"/recover-commit";
     private static final String RECOVER_ABORT = NAMESPACE1 + "/recover-abort";
+    private static final String SNAPSHOT_INDEX = NAMESPACE1 + 
"/snapshot-index";
+    private static final String SNAPSHOT_SEGMENT = NAMESPACE1 + 
"/snapshot-segment";
     private static final String SUBSCRIPTION_NAME = "test-recover";
     private static final String TAKE_SNAPSHOT = NAMESPACE1 + "/take-snapshot";
     private static final String ABORT_DELETE = NAMESPACE1 + "/abort-delete";
@@ -472,60 +490,66 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
 
         PersistentTopic originalTopic = (PersistentTopic) 
getPulsarServiceList().get(0)
                 .getBrokerService().getTopic(TopicName.get(topic).toString(), 
false).get().get();
-        TransactionBufferSnapshotService transactionBufferSnapshotService =
-                mock(TransactionBufferSnapshotService.class);
+        SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot> 
systemTopicTxnBufferSnapshotService =
+                mock(SystemTopicTxnBufferSnapshotService.class);
         SystemTopicClient.Reader<TransactionBufferSnapshot> reader = 
mock(SystemTopicClient.Reader.class);
         SystemTopicClient.Writer<TransactionBufferSnapshot> writer = 
mock(SystemTopicClient.Writer.class);
 
-        
doReturn(CompletableFuture.completedFuture(reader)).when(transactionBufferSnapshotService).createReader(any());
-        
doReturn(CompletableFuture.completedFuture(writer)).when(transactionBufferSnapshotService).createWriter(any());
+        doReturn(CompletableFuture.completedFuture(reader))
+                .when(systemTopicTxnBufferSnapshotService).createReader(any());
+        doReturn(CompletableFuture.completedFuture(writer))
+                .when(systemTopicTxnBufferSnapshotService).createWriter(any());
+        TransactionBufferSnapshotServiceFactory 
transactionBufferSnapshotServiceFactory =
+                mock(TransactionBufferSnapshotServiceFactory.class);
+        doReturn(systemTopicTxnBufferSnapshotService)
+                
.when(transactionBufferSnapshotServiceFactory).getTxnBufferSnapshotService();
         
doReturn(CompletableFuture.completedFuture(null)).when(reader).closeAsync();
         
doReturn(CompletableFuture.completedFuture(null)).when(writer).closeAsync();
-        Field field = 
PulsarService.class.getDeclaredField("transactionBufferSnapshotService");
+        Field field = 
PulsarService.class.getDeclaredField("transactionBufferSnapshotServiceFactory");
         field.setAccessible(true);
-        TransactionBufferSnapshotService 
transactionBufferSnapshotServiceOriginal =
-                (TransactionBufferSnapshotService) 
field.get(getPulsarServiceList().get(0));
+        TransactionBufferSnapshotServiceFactory 
transactionBufferSnapshotServiceFactoryOriginal =
+                
((TransactionBufferSnapshotServiceFactory)field.get(getPulsarServiceList().get(0)));
         // mock reader can't read snapshot fail throw RuntimeException
         doThrow(new RuntimeException("test")).when(reader).hasMoreEvents();
         // check reader close topic
-        checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
-                transactionBufferSnapshotService, originalTopic, field, 
producer);
+        checkCloseTopic(pulsarClient, 
transactionBufferSnapshotServiceFactoryOriginal,
+                transactionBufferSnapshotServiceFactory, originalTopic, field, 
producer);
         doReturn(true).when(reader).hasMoreEvents();
 
         // mock reader can't read snapshot fail throw PulsarClientException
         doThrow(new 
PulsarClientException("test")).when(reader).hasMoreEvents();
         // check reader close topic
-        checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
-                transactionBufferSnapshotService, originalTopic, field, 
producer);
+        checkCloseTopic(pulsarClient, 
transactionBufferSnapshotServiceFactoryOriginal,
+                transactionBufferSnapshotServiceFactory, originalTopic, field, 
producer);
         doReturn(true).when(reader).hasMoreEvents();
 
         // mock create reader fail
         doReturn(FutureUtil.failedFuture(new PulsarClientException("test")))
-                .when(transactionBufferSnapshotService).createReader(any());
+                .when(systemTopicTxnBufferSnapshotService).createReader(any());
         // check create reader fail close topic
         originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
                 .getBrokerService().getTopic(TopicName.get(topic).toString(), 
false).get().get();
-        checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
-                transactionBufferSnapshotService, originalTopic, field, 
producer);
-        
doReturn(CompletableFuture.completedFuture(reader)).when(transactionBufferSnapshotService).createReader(any());
+        checkCloseTopic(pulsarClient, 
transactionBufferSnapshotServiceFactoryOriginal,
+                transactionBufferSnapshotServiceFactory, originalTopic, field, 
producer);
+        
doReturn(CompletableFuture.completedFuture(reader)).when(systemTopicTxnBufferSnapshotService).createReader(any());
 
         // check create writer fail close topic
         originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
                 .getBrokerService().getTopic(TopicName.get(topic).toString(), 
false).get().get();
         // mock create writer fail
         doReturn(FutureUtil.failedFuture(new PulsarClientException("test")))
-                .when(transactionBufferSnapshotService).createWriter(any());
-        checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
-                transactionBufferSnapshotService, originalTopic, field, 
producer);
+                .when(systemTopicTxnBufferSnapshotService).createWriter(any());
+        checkCloseTopic(pulsarClient, 
transactionBufferSnapshotServiceFactoryOriginal,
+                transactionBufferSnapshotServiceFactory, originalTopic, field, 
producer);
     }
 
     private void checkCloseTopic(PulsarClient pulsarClient,
-                                 TransactionBufferSnapshotService 
transactionBufferSnapshotServiceOriginal,
-                                 TransactionBufferSnapshotService 
transactionBufferSnapshotService,
+                                 TransactionBufferSnapshotServiceFactory 
transactionBufferSnapshotServiceFactoryOriginal,
+                                 TransactionBufferSnapshotServiceFactory 
transactionBufferSnapshotServiceFactory,
                                  PersistentTopic originalTopic,
                                  Field field,
                                  Producer<byte[]> producer) throws Exception {
-        field.set(getPulsarServiceList().get(0), 
transactionBufferSnapshotService);
+        field.set(getPulsarServiceList().get(0), 
transactionBufferSnapshotServiceFactory);
 
         // recover again will throw then close topic
         new TopicTransactionBuffer(originalTopic);
@@ -536,7 +560,7 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
             assertTrue((boolean) close.get(originalTopic));
         });
 
-        field.set(getPulsarServiceList().get(0), 
transactionBufferSnapshotServiceOriginal);
+        field.set(getPulsarServiceList().get(0), 
transactionBufferSnapshotServiceFactoryOriginal);
 
         Transaction txn = pulsarClient.newTransaction()
                 .withTransactionTimeout(5, TimeUnit.SECONDS)
@@ -565,4 +589,136 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
         assertTrue(stats.getSubscriptions().keySet().contains("__compaction"));
     }
 
+    @Test
+    public void testTransactionBufferIndexSystemTopic() throws Exception {
+        SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes> 
transactionBufferSnapshotIndexService =
+                new 
TransactionBufferSnapshotServiceFactory(pulsarClient).getTxnBufferSnapshotIndexService();
+
+        SystemTopicClient.Writer<TransactionBufferSnapshotIndexes> 
indexesWriter =
+                
transactionBufferSnapshotIndexService.createWriter(TopicName.get(SNAPSHOT_INDEX)).get();
+
+        SystemTopicClient.Reader<TransactionBufferSnapshotIndexes> 
indexesReader =
+                
transactionBufferSnapshotIndexService.createReader(TopicName.get(SNAPSHOT_INDEX)).get();
+
+
+        List<TransactionBufferSnapshotIndex> indexList = new LinkedList<>();
+
+        for (long i = 0; i < 5; i++) {
+            indexList.add(new TransactionBufferSnapshotIndex(i, i, i, i, i));
+        }
+
+        TransactionBufferSnapshotIndexes 
transactionBufferTransactionBufferSnapshotIndexes =
+                new TransactionBufferSnapshotIndexes(SNAPSHOT_INDEX,
+                        indexList, null);
+
+        indexesWriter.write(SNAPSHOT_INDEX, 
transactionBufferTransactionBufferSnapshotIndexes);
+
+        assertTrue(indexesReader.hasMoreEvents());
+        transactionBufferTransactionBufferSnapshotIndexes = 
indexesReader.readNext().getValue();
+        
assertEquals(transactionBufferTransactionBufferSnapshotIndexes.getTopicName(), 
SNAPSHOT_INDEX);
+        
assertEquals(transactionBufferTransactionBufferSnapshotIndexes.getIndexList().size(),
 5);
+        
assertNull(transactionBufferTransactionBufferSnapshotIndexes.getSnapshot());
+
+        TransactionBufferSnapshotIndex transactionBufferSnapshotIndex =
+                
transactionBufferTransactionBufferSnapshotIndexes.getIndexList().get(1);
+        
assertEquals(transactionBufferSnapshotIndex.getMaxReadPositionLedgerID(), 1L);
+        
assertEquals(transactionBufferSnapshotIndex.getMaxReadPositionEntryID(), 1L);
+        
assertEquals(transactionBufferSnapshotIndex.getPersistentPositionLedgerID(), 
1L);
+        
assertEquals(transactionBufferSnapshotIndex.getPersistentPositionEntryID(), 1L);
+        assertEquals(transactionBufferSnapshotIndex.getSequenceID(), 1L);
+    }
+
+    public static String buildKey(
+            TransactionBufferSnapshotSegment snapshot) {
+        return  "multiple-" + snapshot.getSequenceId() + "-" + 
snapshot.getTopicName();
+    }
+
+    @Test
+    public void testTransactionBufferSegmentSystemTopic() throws Exception {
+        // init topic and topicName
+        String snapshotTopic = NAMESPACE1 + "/" + 
EventType.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS;
+        TopicName snapshotSegmentTopicName = 
TopicName.getPartitionedTopicName(snapshotTopic);
+
+        //send message to create manager ledger
+        Producer<TransactionBufferSnapshotSegment> producer =
+                pulsarClient.newProducer(Schema.AVRO(
+                                TransactionBufferSnapshotSegment.class))
+                .topic(snapshotTopic)
+                .create();
+
+        // get brokerService and pulsarService
+        PulsarService pulsarService = getPulsarServiceList().get(0);
+        BrokerService brokerService = pulsarService.getBrokerService();
+
+        // create snapshot segment writer
+        SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotSegment>
+                transactionBufferSnapshotSegmentService =
+                new 
TransactionBufferSnapshotServiceFactory(pulsarClient).getTxnBufferSnapshotSegmentService();
+
+        SystemTopicClient.Writer<TransactionBufferSnapshotSegment>
+                segmentWriter = 
transactionBufferSnapshotSegmentService.createWriter(snapshotSegmentTopicName).get();
+
+        // write two snapshot to snapshot segment topic
+        TransactionBufferSnapshotSegment snapshot =
+                new TransactionBufferSnapshotSegment();
+
+        //build and send snapshot
+        snapshot.setTopicName(snapshotTopic);
+        snapshot.setSequenceId(1L);
+        snapshot.setMaxReadPositionLedgerId(2L);
+        snapshot.setMaxReadPositionEntryId(3L);
+        snapshot.setAborts(Collections.singletonList(
+                new TxnIDData(1, 1)));
+
+        segmentWriter.write(buildKey(snapshot), snapshot);
+        snapshot.setSequenceId(2L);
+
+        MessageIdImpl messageId = (MessageIdImpl) 
segmentWriter.write(buildKey(snapshot), snapshot);
+
+        //Create read-only managed ledger
+        //And read the entry and decode entry to snapshot
+        CompletableFuture<Entry> entryCompletableFuture = new 
CompletableFuture<>();
+        AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback = new 
AsyncCallbacks
+                .OpenReadOnlyManagedLedgerCallback() {
+            @Override
+            public void 
openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl 
readOnlyManagedLedger, Object ctx) {
+                readOnlyManagedLedger.asyncReadEntry(
+                        new PositionImpl(messageId.getLedgerId(), 
messageId.getEntryId()),
+                        new AsyncCallbacks.ReadEntryCallback() {
+                            @Override
+                            public void readEntryComplete(Entry entry, Object 
ctx) {
+                                entryCompletableFuture.complete(entry);
+                            }
+
+                            @Override
+                            public void readEntryFailed(ManagedLedgerException 
exception, Object ctx) {
+                                
entryCompletableFuture.completeExceptionally(exception);
+                            }
+                        }, null);
+            }
+
+            @Override
+            public void openReadOnlyManagedLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
+                //
+            }
+        };
+        pulsarService.getManagedLedgerFactory()
+                
.asyncOpenReadOnlyManagedLedger(snapshotSegmentTopicName.getPersistenceNamingEncoding(),
 callback,
+                        
brokerService.getManagedLedgerConfig(snapshotSegmentTopicName).get(),null);
+
+        Entry entry = entryCompletableFuture.get();
+        //decode snapshot from entry
+        ByteBuf headersAndPayload = entry.getDataBuffer();
+        //skip metadata
+        MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+        snapshot = 
Schema.AVRO(TransactionBufferSnapshotSegment.class).decode(Unpooled.wrappedBuffer(headersAndPayload).nioBuffer());
+
+        //verify snapshot
+        assertEquals(snapshot.getTopicName(), snapshotTopic);
+        assertEquals(snapshot.getSequenceId(), 2L);
+        assertEquals(snapshot.getMaxReadPositionLedgerId(), 2L);
+        assertEquals(snapshot.getMaxReadPositionEntryId(), 3L);
+        assertEquals(snapshot.getAborts().get(0), new TxnIDData(1, 1));
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 952cd9b2c45..67e7fe268a6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -84,8 +84,9 @@ import 
org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
 import org.apache.pulsar.broker.service.BacklogQuotaManager;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
 import org.apache.pulsar.broker.service.Topic;
-import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
+import 
org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
@@ -96,7 +97,7 @@ import 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferPr
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferRecoverCallBack;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
-import 
org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
 import 
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
 import 
org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckReplyCallBack;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
@@ -1514,18 +1515,23 @@ public class TransactionTest extends 
TransactionTestBase {
         when(pendingAckStoreProvider.newPendingAckStore(any()))
                 
.thenReturn(CompletableFuture.completedFuture(pendingAckStore));
         // Mock TransactionBufferSnapshotService
-        TransactionBufferSnapshotService transactionBufferSnapshotService
-                = mock(TransactionBufferSnapshotService.class);
-        SystemTopicClient.Writer writer = mock(SystemTopicClient.Writer.class);
+        SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot> 
systemTopicTxnBufferSnapshotService
+                = mock(SystemTopicTxnBufferSnapshotService.class);
+        SystemTopicClient.Writer<TransactionBufferSnapshot> writer = 
mock(SystemTopicClient.Writer.class);
         
when(writer.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
-        when(transactionBufferSnapshotService.createWriter(any()))
+        when(systemTopicTxnBufferSnapshotService.createWriter(any()))
                 .thenReturn(CompletableFuture.completedFuture(writer));
+        TransactionBufferSnapshotServiceFactory 
transactionBufferSnapshotServiceFactory =
+                mock(TransactionBufferSnapshotServiceFactory.class);
+        
when(transactionBufferSnapshotServiceFactory.getTxnBufferSnapshotService())
+                .thenReturn(systemTopicTxnBufferSnapshotService);
+
         // Mock pulsar.
         PulsarService pulsar = mock(PulsarService.class);
         when(pulsar.getConfiguration()).thenReturn(serviceConfiguration);
         when(pulsar.getConfig()).thenReturn(serviceConfiguration);
         
when(pulsar.getTransactionExecutorProvider()).thenReturn(executorProvider);
-        
when(pulsar.getTransactionBufferSnapshotService()).thenReturn(transactionBufferSnapshotService);
+        
when(pulsar.getTransactionBufferSnapshotServiceFactory()).thenReturn(transactionBufferSnapshotServiceFactory);
         TopicTransactionBufferProvider topicTransactionBufferProvider = new 
TopicTransactionBufferProvider();
         
when(pulsar.getTransactionBufferProvider()).thenReturn(topicTransactionBufferProvider);
         // Mock BacklogQuotaManager
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventType.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventType.java
index b60350e8e39..7093665ade3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventType.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventType.java
@@ -31,5 +31,15 @@ public enum EventType {
     /**
      * Transaction buffer snapshot events.
      */
-    TRANSACTION_BUFFER_SNAPSHOT
+    TRANSACTION_BUFFER_SNAPSHOT,
+
+    /**
+     * Transaction buffer snapshot segment events.
+     */
+    TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS,
+
+    /**
+     * Transaction buffer snapshot indexes events.
+     */
+    TRANSACTION_BUFFER_SNAPSHOT_INDEXES
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
index eaab8261460..e3b7b2cf05d 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
@@ -37,6 +37,15 @@ public class SystemTopicNames {
      */
     public static final String TRANSACTION_BUFFER_SNAPSHOT = 
"__transaction_buffer_snapshot";
 
+    /**
+     * Local topic name for the transaction buffer snapshot segments.
+     */
+    public static final String TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS = 
"__transaction_buffer_snapshot_segments";
+
+    /**
+     * Local topic name for the transaction buffer snapshot indexes.
+     */
+    public static final String TRANSACTION_BUFFER_SNAPSHOT_INDEXES = 
"__transaction_buffer_snapshot_indexes";
 
     public static final String PENDING_ACK_STORE_SUFFIX = 
"__transaction_pending_ack";
 
@@ -46,7 +55,8 @@ public class SystemTopicNames {
      * The set of all local topic names declared above.
      */
     public static final Set<String> EVENTS_TOPIC_NAMES =
-            
Collections.unmodifiableSet(Sets.newHashSet(NAMESPACE_EVENTS_LOCAL_NAME, 
TRANSACTION_BUFFER_SNAPSHOT));
+            
Collections.unmodifiableSet(Sets.newHashSet(NAMESPACE_EVENTS_LOCAL_NAME, 
TRANSACTION_BUFFER_SNAPSHOT,
+                    TRANSACTION_BUFFER_SNAPSHOT_INDEXES, 
TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS));
 
 
     public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = 
TopicName.get(TopicDomain.persistent.value(),

Reply via email to