This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 40c8c23123e [improve][broker] Reduce the CPU pressure from the 
transaction buffer in rolling restarts (#23062)
40c8c23123e is described below

commit 40c8c23123e6c4e1c403445cc98e8fed6997f5b0
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Jul 29 20:48:15 2024 +0800

    [improve][broker] Reduce the CPU pressure from the transaction buffer in 
rolling restarts (#23062)
---
 .../org/apache/pulsar/broker/PulsarService.java    |   2 +-
 .../SystemTopicTxnBufferSnapshotService.java       |  15 +-
 .../TransactionBufferSnapshotServiceFactory.java   |  26 +-
 .../SingleSnapshotAbortedTxnProcessorImpl.java     |  73 +---
 .../SnapshotSegmentAbortedTxnProcessorImpl.java    | 394 +++++++++------------
 .../broker/transaction/buffer/impl/TableView.java  |  97 +++++
 .../buffer/impl/TopicTransactionBuffer.java        |   5 +-
 .../java/org/apache/pulsar/utils/SimpleCache.java  |  83 +++++
 .../TopicTransactionBufferRecoverTest.java         |  46 ++-
 .../org/apache/pulsar/utils/SimpleCacheTest.java   |  81 +++++
 10 files changed, 486 insertions(+), 336 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index c623f5d4e5b..b23851a5ec4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -992,7 +992,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 
MLTransactionMetadataStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
                 
MLPendingAckStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
 
-                this.transactionBufferSnapshotServiceFactory = new 
TransactionBufferSnapshotServiceFactory(getClient());
+                this.transactionBufferSnapshotServiceFactory = new 
TransactionBufferSnapshotServiceFactory(this);
 
                 this.transactionTimer =
                         new HashedWheelTimer(new 
DefaultThreadFactory("pulsar-transaction-timer"));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
index bd1b9098169..ba6cbee3557 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
@@ -22,12 +22,16 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
-import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.broker.transaction.buffer.impl.TableView;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
@@ -42,6 +46,8 @@ public class SystemTopicTxnBufferSnapshotService<T> {
     protected final EventType systemTopicType;
 
     private final ConcurrentHashMap<NamespaceName, ReferenceCountedWriter<T>> 
refCountedWriterMap;
+    @Getter
+    private final TableView<T> tableView;
 
     // The class ReferenceCountedWriter will maintain the reference count,
     // when the reference count decrement to 0, it will be removed from 
writerFutureMap, the writer will be closed.
@@ -95,13 +101,16 @@ public class SystemTopicTxnBufferSnapshotService<T> {
 
     }
 
-    public SystemTopicTxnBufferSnapshotService(PulsarClient client, EventType 
systemTopicType,
-                                               Class<T> schemaType) {
+    public SystemTopicTxnBufferSnapshotService(PulsarService pulsar, EventType 
systemTopicType,
+                                               Class<T> schemaType) throws 
PulsarServerException {
+        final var client = (PulsarClientImpl) pulsar.getClient();
         this.namespaceEventsSystemTopicFactory = new 
NamespaceEventsSystemTopicFactory(client);
         this.systemTopicType = systemTopicType;
         this.schemaType = schemaType;
         this.clients = new ConcurrentHashMap<>();
         this.refCountedWriterMap = new ConcurrentHashMap<>();
+        this.tableView = new TableView<>(this::createReader,
+                client.getConfiguration().getOperationTimeoutMs(), 
pulsar.getExecutor());
     }
 
     public CompletableFuture<SystemTopicClient.Reader<T>> 
createReader(TopicName topicName) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java
index 4b8548fae47..d54f65572f5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java
@@ -18,12 +18,15 @@
  */
 package org.apache.pulsar.broker.service;
 
+import lombok.Getter;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
-import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.events.EventType;
 
+@Getter
 public class TransactionBufferSnapshotServiceFactory {
 
     private SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot> 
txnBufferSnapshotService;
@@ -33,29 +36,16 @@ public class TransactionBufferSnapshotServiceFactory {
 
     private 
SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes> 
txnBufferSnapshotIndexService;
 
-    public TransactionBufferSnapshotServiceFactory(PulsarClient pulsarClient) {
-        this.txnBufferSnapshotSegmentService = new 
SystemTopicTxnBufferSnapshotService<>(pulsarClient,
+    public TransactionBufferSnapshotServiceFactory(PulsarService pulsar) 
throws PulsarServerException {
+        this.txnBufferSnapshotSegmentService = new 
SystemTopicTxnBufferSnapshotService<>(pulsar,
                 EventType.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS,
                 TransactionBufferSnapshotSegment.class);
-        this.txnBufferSnapshotIndexService = new 
SystemTopicTxnBufferSnapshotService<>(pulsarClient,
+        this.txnBufferSnapshotIndexService = new 
SystemTopicTxnBufferSnapshotService<>(pulsar,
                 EventType.TRANSACTION_BUFFER_SNAPSHOT_INDEXES, 
TransactionBufferSnapshotIndexes.class);
-        this.txnBufferSnapshotService = new 
SystemTopicTxnBufferSnapshotService<>(pulsarClient,
+        this.txnBufferSnapshotService = new 
SystemTopicTxnBufferSnapshotService<>(pulsar,
                 EventType.TRANSACTION_BUFFER_SNAPSHOT, 
TransactionBufferSnapshot.class);
     }
 
-    public 
SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes> 
getTxnBufferSnapshotIndexService() {
-        return this.txnBufferSnapshotIndexService;
-    }
-
-    public 
SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotSegment>
-    getTxnBufferSnapshotSegmentService() {
-        return this.txnBufferSnapshotSegmentService;
-    }
-
-    public SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot> 
getTxnBufferSnapshotService() {
-        return this.txnBufferSnapshotService;
-    }
-
     public void close() throws Exception {
         if (this.txnBufferSnapshotIndexService != null) {
             this.txnBufferSnapshotIndexService.close();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
index 5c9075e9a38..1649349e3e6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
@@ -21,26 +21,19 @@ package org.apache.pulsar.broker.transaction.buffer.impl;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.commons.collections4.map.LinkedMap;
-import org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
 import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
-import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TransactionBufferStats;
-import org.apache.pulsar.common.util.FutureUtil;
 
 @Slf4j
 public class SingleSnapshotAbortedTxnProcessorImpl implements 
AbortedTxnProcessor {
@@ -91,48 +84,27 @@ public class SingleSnapshotAbortedTxnProcessorImpl 
implements AbortedTxnProcesso
         return aborts.containsKey(txnID);
     }
 
-    private long getSystemClientOperationTimeoutMs() throws Exception {
-        PulsarClientImpl pulsarClient = (PulsarClientImpl) 
topic.getBrokerService().getPulsar().getClient();
-        return pulsarClient.getConfiguration().getOperationTimeoutMs();
-    }
-
     @Override
     public CompletableFuture<Position> recoverFromSnapshot() {
-        return 
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
-                .getTxnBufferSnapshotService()
-                
.createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
-                    try {
-                    Position startReadCursorPosition = null;
-                        while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = 
reader.readNextAsync()
-                                    .get(getSystemClientOperationTimeoutMs(), 
TimeUnit.MILLISECONDS);
-                            if (topic.getName().equals(message.getKey())) {
-                                TransactionBufferSnapshot 
transactionBufferSnapshot = message.getValue();
-                                if (transactionBufferSnapshot != null) {
-                                    handleSnapshot(transactionBufferSnapshot);
-                                    startReadCursorPosition = 
PositionFactory.create(
-                                            
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
-                                            
transactionBufferSnapshot.getMaxReadPositionEntryId());
-                                }
-                            }
-                        }
-                        return 
CompletableFuture.completedFuture(startReadCursorPosition);
-                    } catch (TimeoutException ex) {
-                        Throwable t = FutureUtil.unwrapCompletionException(ex);
-                        String errorMessage = String.format("[%s] Transaction 
buffer recover fail by read "
-                                + "transactionBufferSnapshot timeout!", 
topic.getName());
-                        log.error(errorMessage, t);
-                        return FutureUtil.failedFuture(
-                                new 
BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t));
-                    } catch (Exception ex) {
-                        log.error("[{}] Transaction buffer recover fail when 
read "
-                                + "transactionBufferSnapshot!", 
topic.getName(), ex);
-                        return FutureUtil.failedFuture(ex);
-                    } finally {
-                        closeReader(reader);
-                    }
-                },  
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
-                        .getExecutor(this));
+        final var future = new CompletableFuture<Position>();
+        final var pulsar = topic.getBrokerService().getPulsar();
+        pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() 
-> {
+            try {
+                final var snapshot = 
pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
+                        .getTableView().readLatest(topic.getName());
+                if (snapshot != null) {
+                    handleSnapshot(snapshot);
+                    final var startReadCursorPosition = 
PositionFactory.create(snapshot.getMaxReadPositionLedgerId(),
+                            snapshot.getMaxReadPositionEntryId());
+                    future.complete(startReadCursorPosition);
+                } else {
+                    future.complete(null);
+                }
+            } catch (Throwable e) {
+                future.completeExceptionally(e);
+            }
+        });
+        return future;
     }
 
     @Override
@@ -191,13 +163,6 @@ public class SingleSnapshotAbortedTxnProcessorImpl 
implements AbortedTxnProcesso
         return CompletableFuture.completedFuture(null);
     }
 
-    private void 
closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> reader) {
-        reader.closeAsync().exceptionally(e -> {
-            log.error("[{}]Transaction buffer reader close error!", 
topic.getName(), e);
-            return null;
-        });
-    }
-
     private void handleSnapshot(TransactionBufferSnapshot snapshot) {
         if (snapshot.getAborts() != null) {
             snapshot.getAborts().forEach(abortTxnMetadata ->
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index e94e7a04779..4ca27f77a87 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -24,11 +24,11 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.Supplier;
@@ -54,7 +54,6 @@ import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBuffer
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexesMetadata;
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
 import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData;
-import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -228,220 +227,129 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
 
     @Override
     public CompletableFuture<Position> recoverFromSnapshot() {
-        return 
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
-                .getTxnBufferSnapshotIndexService()
-                
.createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
-                    Position startReadCursorPosition = null;
-                    TransactionBufferSnapshotIndexes persistentSnapshotIndexes 
= null;
-                    try {
-                        /*
-                          Read the transaction snapshot segment index.
-                          <p>
-                              The processor can get the sequence ID, unsealed 
transaction IDs,
-                              segment index list and max read position in the 
snapshot segment index.
-                              Then we can traverse the index list to read all 
aborted transaction IDs
-                              in segments to aborts.
-                          </p>
-                         */
-                        while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshotIndexes> message 
= reader.readNextAsync()
-                                    .get(getSystemClientOperationTimeoutMs(), 
TimeUnit.MILLISECONDS);
-                            if (topic.getName().equals(message.getKey())) {
-                                TransactionBufferSnapshotIndexes 
transactionBufferSnapshotIndexes = message.getValue();
-                                if (transactionBufferSnapshotIndexes != null) {
-                                    persistentSnapshotIndexes = 
transactionBufferSnapshotIndexes;
-                                    startReadCursorPosition = 
PositionFactory.create(
-                                            
transactionBufferSnapshotIndexes.getSnapshot().getMaxReadPositionLedgerId(),
-                                            
transactionBufferSnapshotIndexes.getSnapshot().getMaxReadPositionEntryId());
-                                }
-                            }
-                        }
-                    } catch (TimeoutException ex) {
-                        Throwable t = FutureUtil.unwrapCompletionException(ex);
-                        String errorMessage = String.format("[%s] Transaction 
buffer recover fail by read "
-                                + "transactionBufferSnapshot timeout!", 
topic.getName());
-                        log.error(errorMessage, t);
-                        return FutureUtil.failedFuture(
-                                new 
BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t));
-                    } catch (Exception ex) {
-                        log.error("[{}] Transaction buffer recover fail when 
read "
-                                + "transactionBufferSnapshot!", 
topic.getName(), ex);
-                        return FutureUtil.failedFuture(ex);
-                    } finally {
-                        closeReader(reader);
-                    }
-                    Position finalStartReadCursorPosition = 
startReadCursorPosition;
-                    TransactionBufferSnapshotIndexes 
finalPersistentSnapshotIndexes = persistentSnapshotIndexes;
-                    if (persistentSnapshotIndexes == null) {
-                        return recoverOldSnapshot();
-                    } else {
-                        this.unsealedTxnIds = 
convertTypeToTxnID(persistentSnapshotIndexes
-                                .getSnapshot().getAborts());
-                    }
-                    //Read snapshot segment to recover aborts.
-                    ArrayList<CompletableFuture<Void>> completableFutures = 
new ArrayList<>();
-                    CompletableFuture<Void> 
openManagedLedgerAndHandleSegmentsFuture = new CompletableFuture<>();
-                    AtomicBoolean hasInvalidIndex = new AtomicBoolean(false);
-                    AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback 
= new AsyncCallbacks
-                            .OpenReadOnlyManagedLedgerCallback() {
-                        @Override
-                        public void 
openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl 
readOnlyManagedLedger,
-                                                                      Object 
ctx) {
-                            
finalPersistentSnapshotIndexes.getIndexList().forEach(index -> {
-                                CompletableFuture<Void> handleSegmentFuture = 
new CompletableFuture<>();
-                                completableFutures.add(handleSegmentFuture);
-                                readOnlyManagedLedger.asyncReadEntry(
-                                        
PositionFactory.create(index.getSegmentLedgerID(),
-                                                index.getSegmentEntryID()),
-                                        new AsyncCallbacks.ReadEntryCallback() 
{
-                                            @Override
-                                            public void 
readEntryComplete(Entry entry, Object ctx) {
-                                                
handleSnapshotSegmentEntry(entry);
-                                                
indexes.put(PositionFactory.create(
-                                                                
index.abortedMarkLedgerID,
-                                                                
index.abortedMarkEntryID),
-                                                        index);
-                                                entry.release();
-                                                
handleSegmentFuture.complete(null);
-                                            }
-
-                                            @Override
-                                            public void 
readEntryFailed(ManagedLedgerException exception, Object ctx) {
-                                                /*
-                                                  The logic flow of deleting 
expired segment is:
-                                                  <p>
-                                                      1. delete segment
-                                                      2. update segment index
-                                                  </p>
-                                                  If the worker delete segment 
successfully
-                                                  but failed to update segment 
index,
-                                                  the segment can not be read 
according to the index.
-                                                  We update index again if 
there are invalid indexes.
-                                                 */
-                                                if (((ManagedLedgerImpl) 
topic.getManagedLedger())
-                                                        
.ledgerExists(index.getAbortedMarkLedgerID())) {
-                                                    log.error("[{}] Failed to 
read snapshot segment [{}:{}]",
-                                                            topic.getName(), 
index.segmentLedgerID,
-                                                            
index.segmentEntryID, exception);
-                                                    
handleSegmentFuture.completeExceptionally(exception);
-                                                } else {
-                                                    hasInvalidIndex.set(true);
-                                                }
-                                            }
-
-                                            @Override
-                                            public String toString() {
-                                                return 
String.format("Transaction buffer [%s] recover from snapshot",
-                                                        
SnapshotSegmentAbortedTxnProcessorImpl.this.topic.getName());
-                                            }
-                                        }, null);
-                            });
-                            
openManagedLedgerAndHandleSegmentsFuture.complete(null);
-                        }
+        final var pulsar = topic.getBrokerService().getPulsar();
+        final var future = new CompletableFuture<Position>();
+        pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() 
-> {
+            try {
+                final var indexes = 
pulsar.getTransactionBufferSnapshotServiceFactory()
+                        
.getTxnBufferSnapshotIndexService().getTableView().readLatest(topic.getName());
+                if (indexes == null) {
+                    // Try recovering from the old format snapshot
+                    future.complete(recoverOldSnapshot());
+                    return;
+                }
+                final var snapshot = indexes.getSnapshot();
+                final var startReadCursorPosition = 
PositionFactory.create(snapshot.getMaxReadPositionLedgerId(),
+                        snapshot.getMaxReadPositionEntryId());
+                this.unsealedTxnIds = convertTypeToTxnID(snapshot.getAborts());
+                // Read snapshot segment to recover aborts
+                final var snapshotSegmentTopicName = 
TopicName.get(TopicDomain.persistent.toString(),
+                        TopicName.get(topic.getName()).getNamespaceObject(),
+                        SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+                readSegmentEntries(snapshotSegmentTopicName, indexes);
+                if (!this.indexes.isEmpty()) {
+                    // If there is no segment index, the persistent worker 
will write segment begin from 0.
+                    
persistentWorker.sequenceID.set(this.indexes.get(this.indexes.lastKey()).sequenceID
 + 1);
+                }
+                unsealedTxnIds.forEach(txnID -> aborts.put(txnID, txnID));
+                future.complete(startReadCursorPosition);
+            } catch (Throwable throwable) {
+                future.completeExceptionally(throwable);
+            }
+        });
+        return future;
+    }
 
-                        @Override
-                        public void 
openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Object ctx) {
-                            log.error("[{}] Failed to open readOnly managed 
ledger", topic, exception);
-                            
openManagedLedgerAndHandleSegmentsFuture.completeExceptionally(exception);
-                        }
-                    };
-
-                    TopicName snapshotSegmentTopicName = 
TopicName.get(TopicDomain.persistent.toString(),
-                            
TopicName.get(topic.getName()).getNamespaceObject(),
-                            
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
-                    
this.topic.getBrokerService().getPulsar().getManagedLedgerFactory()
-                            
.asyncOpenReadOnlyManagedLedger(snapshotSegmentTopicName
-                                            .getPersistenceNamingEncoding(), 
callback,
-                                    topic.getManagedLedger().getConfig(),
-                                    null);
-                    /*
-                       Wait the processor recover completely and then allow TB
-                       to recover the messages after the 
startReadCursorPosition.
-                     */
-                    return openManagedLedgerAndHandleSegmentsFuture
-                            .thenCompose((ignore) -> 
FutureUtil.waitForAll(completableFutures))
-                            .thenCompose((i) -> {
-                                /*
-                                  Update the snapshot segment index if there 
exist invalid indexes.
-                                 */
-                                if (hasInvalidIndex.get()) {
-                                    
persistentWorker.appendTask(PersistentWorker.OperationType.UpdateIndex,
-                                            () -> 
persistentWorker.updateSnapshotIndex(
-                                                    
finalPersistentSnapshotIndexes.getSnapshot()));
-                                }
-                                /*
-                                   If there is no segment index, the 
persistent worker will write segment begin from 0.
-                                 */
-                                if (indexes.size() != 0) {
-                                    
persistentWorker.sequenceID.set(indexes.get(indexes.lastKey()).sequenceID + 1);
-                                }
-                                /*
-                                  Append the aborted txn IDs in the index 
metadata
-                                  can keep the order of the aborted txn in the 
aborts.
-                                  So that we can trim the expired snapshot 
segment in aborts
-                                  according to the latest transaction IDs in 
the segmentIndex.
-                                 */
-                                unsealedTxnIds.forEach(txnID -> 
aborts.put(txnID, txnID));
-                                return 
CompletableFuture.completedFuture(finalStartReadCursorPosition);
-                            }).exceptionally(ex -> {
-                                log.error("[{}] Failed to recover snapshot 
segment", this.topic.getName(), ex);
-                                return null;
-                            });
-
-                    },  
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
-                        .getExecutor(this));
+    private void readSegmentEntries(TopicName topicName, 
TransactionBufferSnapshotIndexes indexes) throws Exception {
+        final var managedLedger = openReadOnlyManagedLedger(topicName);
+        boolean hasInvalidIndex = false;
+        for (var index : indexes.getIndexList()) {
+            final var position = 
PositionFactory.create(index.getSegmentLedgerID(), index.getSegmentEntryID());
+            final var abortedPosition = 
PositionFactory.create(index.abortedMarkLedgerID, index.abortedMarkEntryID);
+            try {
+                final var entry = readEntry(managedLedger, position);
+                try {
+                    handleSnapshotSegmentEntry(entry);
+                    this.indexes.put(abortedPosition, index);
+                } finally {
+                    entry.release();
+                }
+            } catch (Throwable throwable) {
+                if (((ManagedLedgerImpl) topic.getManagedLedger())
+                        .ledgerExists(index.getAbortedMarkLedgerID())) {
+                    log.error("[{}] Failed to read snapshot segment [{}:{}]",
+                            topic.getName(), index.segmentLedgerID,
+                            index.segmentEntryID, throwable);
+                    throw throwable;
+                } else {
+                    hasInvalidIndex = true;
+                }
+            }
+        }
+        if (hasInvalidIndex) {
+            // Update the snapshot segment index if there exist invalid 
indexes.
+            
persistentWorker.appendTask(PersistentWorker.OperationType.UpdateIndex,
+                    () -> 
persistentWorker.updateSnapshotIndex(indexes.getSnapshot()));
+        }
+    }
+
+    private ReadOnlyManagedLedgerImpl openReadOnlyManagedLedger(TopicName 
topicName) throws Exception {
+        final var future = new CompletableFuture<ReadOnlyManagedLedgerImpl>();
+        final var callback = new 
AsyncCallbacks.OpenReadOnlyManagedLedgerCallback() {
+            @Override
+            public void 
openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl managedLedger, 
Object ctx) {
+                future.complete(managedLedger);
+            }
+
+            @Override
+            public void openReadOnlyManagedLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
+                future.completeExceptionally(exception);
+            }
+
+            @Override
+            public String toString() {
+                return String.format("Transaction buffer [%s] recover from 
snapshot",
+                        
SnapshotSegmentAbortedTxnProcessorImpl.this.topic.getName());
+            }
+        };
+        
topic.getBrokerService().getPulsar().getManagedLedgerFactory().asyncOpenReadOnlyManagedLedger(
+                topicName.getPersistenceNamingEncoding(), callback, 
topic.getManagedLedger().getConfig(), null);
+        return wait(future, "open read only ml for " + topicName);
+    }
+
+    private Entry readEntry(ReadOnlyManagedLedgerImpl managedLedger, Position 
position) throws Exception {
+        final var future = new CompletableFuture<Entry>();
+        managedLedger.asyncReadEntry(position, new 
AsyncCallbacks.ReadEntryCallback() {
+            @Override
+            public void readEntryComplete(Entry entry, Object ctx) {
+                future.complete(entry);
+            }
+
+            @Override
+            public void readEntryFailed(ManagedLedgerException exception, 
Object ctx) {
+                future.completeExceptionally(exception);
+            }
+        }, null);
+        return wait(future, "read entry from " + position);
     }
 
     // This method will be deprecated and removed in version 4.x.0
-    private CompletableFuture<Position> recoverOldSnapshot() {
-        return 
topic.getBrokerService().getPulsar().getPulsarResources().getTopicResources()
-                
.listPersistentTopicsAsync(NamespaceName.get(TopicName.get(topic.getName()).getNamespace()))
-                .thenCompose(topics -> {
-                    if (!topics.contains(TopicDomain.persistent + "://"
-                            + TopicName.get(topic.getName()).getNamespace() + 
"/"
-                            + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT)) {
-                        return CompletableFuture.completedFuture(null);
-                    } else {
-                        return 
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
-                                .getTxnBufferSnapshotService()
-                                
.createReader(TopicName.get(topic.getName())).thenComposeAsync(snapshotReader 
-> {
-                                    Position 
startReadCursorPositionInOldSnapshot = null;
-                                    try {
-                                        while (snapshotReader.hasMoreEvents()) 
{
-                                            Message<TransactionBufferSnapshot> 
message = snapshotReader.readNextAsync()
-                                                    
.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
-                                            if 
(topic.getName().equals(message.getKey())) {
-                                                TransactionBufferSnapshot 
transactionBufferSnapshot =
-                                                        message.getValue();
-                                                if (transactionBufferSnapshot 
!= null) {
-                                                    
handleOldSnapshot(transactionBufferSnapshot);
-                                                    
startReadCursorPositionInOldSnapshot = PositionFactory.create(
-                                                            
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
-                                                            
transactionBufferSnapshot.getMaxReadPositionEntryId());
-                                                }
-                                            }
-                                        }
-                                    } catch (TimeoutException ex) {
-                                        Throwable t = 
FutureUtil.unwrapCompletionException(ex);
-                                        String errorMessage = 
String.format("[%s] Transaction buffer recover fail by "
-                                                + "read 
transactionBufferSnapshot timeout!", topic.getName());
-                                        log.error(errorMessage, t);
-                                        return FutureUtil.failedFuture(new 
BrokerServiceException
-                                                
.ServiceUnitNotReadyException(errorMessage, t));
-                                    } catch (Exception ex) {
-                                        log.error("[{}] Transaction buffer 
recover fail when read "
-                                                + 
"transactionBufferSnapshot!", topic.getName(), ex);
-                                        return FutureUtil.failedFuture(ex);
-                                    } finally {
-                                        assert snapshotReader != null;
-                                        closeReader(snapshotReader);
-                                    }
-                                    return 
CompletableFuture.completedFuture(startReadCursorPositionInOldSnapshot);
-                                },
-                                        
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
-                                                .getExecutor(this));
-                    }
-                });
+    private Position recoverOldSnapshot() throws Exception {
+        final var pulsar = topic.getBrokerService().getPulsar();
+        final var topicName = TopicName.get(topic.getName());
+        final var topics = 
wait(pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(
+                NamespaceName.get(topicName.getNamespace())), "list persistent 
topics");
+        if (!topics.contains(TopicDomain.persistent + "://" + 
topicName.getNamespace() + "/"
+                + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT)) {
+            return null;
+        }
+        final var snapshot = 
pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
+                .getTableView().readLatest(topic.getName());
+        if (snapshot == null) {
+            return null;
+        }
+        handleOldSnapshot(snapshot);
+        return PositionFactory.create(snapshot.getMaxReadPositionLedgerId(), 
snapshot.getMaxReadPositionEntryId());
     }
 
     // This method will be deprecated and removed in version 4.x.0
@@ -509,9 +417,17 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
         return pulsarClient.getConfiguration().getOperationTimeoutMs();
     }
 
-    private <T> void  closeReader(SystemTopicClient.Reader<T> reader) {
+    private <R> R wait(CompletableFuture<R> future, String msg) throws 
Exception {
+        try {
+            return future.get(getSystemClientOperationTimeoutMs(), 
TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw new CompletionException("Failed to " + msg, e.getCause());
+        }
+    }
+
+    private <T> void closeReader(SystemTopicClient.Reader<T> reader) {
         reader.closeAsync().exceptionally(e -> {
-            log.error("[{}]Transaction buffer snapshot reader close error!", 
topic.getName(), e);
+            log.warn("[{}] Failed to close reader: {}", topic.getName(), 
e.getMessage());
             return null;
         });
     }
@@ -838,25 +754,37 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
          * </p>
          */
         private CompletableFuture<Void> clearAllSnapshotSegments() {
-            return 
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
-                    .getTxnBufferSnapshotSegmentService()
-                    
.createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
-                        try {
-                            while (reader.hasMoreEvents()) {
-                                Message<TransactionBufferSnapshotSegment> 
message = reader.readNextAsync()
-                                        
.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
-                                if 
(topic.getName().equals(message.getValue().getTopicName())) {
-                                   
snapshotSegmentsWriter.getFuture().get().write(message.getKey(), null);
-                                }
+            final var future = new CompletableFuture<Void>();
+            final var pulsar = topic.getBrokerService().getPulsar();
+            
pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> {
+                try {
+                    final var reader = 
wait(pulsar.getTransactionBufferSnapshotServiceFactory()
+                            
.getTxnBufferSnapshotSegmentService().createReader(TopicName.get(topic.getName()))
+                            , "create reader");
+                    try {
+                        while (wait(reader.hasMoreEventsAsync(), "has more 
events")) {
+                            final var message = wait(reader.readNextAsync(), 
"read next");
+                            if 
(topic.getName().equals(message.getValue().getTopicName())) {
+                                
snapshotSegmentsWriter.getFuture().get().write(message.getKey(), null);
                             }
-                            return CompletableFuture.completedFuture(null);
-                        } catch (Exception ex) {
-                            log.error("[{}] Transaction buffer clear snapshot 
segments fail!", topic.getName(), ex);
-                            return FutureUtil.failedFuture(ex);
-                        } finally {
-                            closeReader(reader);
                         }
-           });
+                        future.complete(null);
+                    } finally {
+                        closeReader(reader);
+                    }
+                } catch (Throwable throwable) {
+                    future.completeExceptionally(throwable);
+                }
+            });
+            return future;
+        }
+
+        private <R> R wait(CompletableFuture<R> future, String msg) throws 
Exception {
+            try {
+                return future.get(getSystemClientOperationTimeoutMs(), 
TimeUnit.MILLISECONDS);
+            } catch (ExecutionException e) {
+                throw new CompletionException("Failed to " + msg, 
e.getCause());
+            }
         }
 
         synchronized CompletableFuture<Void> closeAsync() {
@@ -882,4 +810,4 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
         return segment;
     }
 
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
new file mode 100644
index 00000000000..7608a393cc9
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import static org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.utils.SimpleCache;
+
+/**
+ * Compared with the more generic {@link 
org.apache.pulsar.client.api.TableView}, this table view
+ * - Provides just a single public method that reads the latest value 
synchronously.
+ * - Maintains multiple long-lived readers that will be expired after some 
time (1 minute by default).
+ */
+@Slf4j
+public class TableView<T> {
+
+    // Remove the cached reader and snapshots if there is no refresh request 
in 1 minute
+    private static final long CACHE_EXPIRE_TIMEOUT_MS = 60 * 1000L;
+    private static final long CACHE_EXPIRE_CHECK_FREQUENCY_MS = 3000L;
+    @VisibleForTesting
+    protected final Function<TopicName, CompletableFuture<Reader<T>>> 
readerCreator;
+    private final Map<String, T> snapshots = new ConcurrentHashMap<>();
+    private final long clientOperationTimeoutMs;
+    private final SimpleCache<NamespaceName, Reader<T>> readers;
+
+    public TableView(Function<TopicName, CompletableFuture<Reader<T>>> 
readerCreator, long clientOperationTimeoutMs,
+                     ScheduledExecutorService executor) {
+        this.readerCreator = readerCreator;
+        this.clientOperationTimeoutMs = clientOperationTimeoutMs;
+        this.readers = new SimpleCache<>(executor, CACHE_EXPIRE_TIMEOUT_MS, 
CACHE_EXPIRE_CHECK_FREQUENCY_MS);
+    }
+
+    public T readLatest(String topic) throws Exception {
+        final var reader = getReader(topic);
+        while (wait(reader.hasMoreEventsAsync(), "has more events")) {
+            final var msg = wait(reader.readNextAsync(), "read message");
+            if (msg.getKey() != null) {
+                if (msg.getValue() != null) {
+                    snapshots.put(msg.getKey(), msg.getValue());
+                } else {
+                    snapshots.remove(msg.getKey());
+                }
+            }
+        }
+        return snapshots.get(topic);
+    }
+
+    @VisibleForTesting
+    protected Reader<T> getReader(String topic) {
+        final var topicName = TopicName.get(topic);
+        return readers.get(topicName.getNamespaceObject(), () -> {
+            try {
+                return wait(readerCreator.apply(topicName), "create reader");
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }, __ -> __.closeAsync().exceptionally(e -> {
+            log.warn("Failed to close reader {}", e.getMessage());
+            return null;
+        }));
+    }
+
+    private <R> R wait(CompletableFuture<R> future, String msg) throws 
Exception {
+        try {
+            return future.get(clientOperationTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw new CompletionException("Failed to " + msg, e.getCause());
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index b4662e5fa83..7561457d11f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -632,7 +632,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                         this, topic.getName());
                 return;
             }
-            
abortedTxnProcessor.recoverFromSnapshot().thenAcceptAsync(startReadCursorPosition
 -> {
+            
abortedTxnProcessor.recoverFromSnapshot().thenAccept(startReadCursorPosition -> 
{
                 //Transaction is not use for this topic, so just make 
maxReadPosition as LAC.
                 if (startReadCursorPosition == null) {
                     callBack.noNeedToRecover();
@@ -678,8 +678,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
 
                 closeCursor(SUBSCRIPTION_NAME);
                 callBack.recoverComplete();
-            }, 
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
-                    .getExecutor(this)).exceptionally(e -> {
+            }).exceptionally(e -> {
                 callBack.recoverExceptionally(e.getCause());
                 log.error("[{}]Transaction buffer failed to recover 
snapshot!", topic.getName(), e);
                 return null;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java
new file mode 100644
index 00000000000..6a3a6721198
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import lombok.RequiredArgsConstructor;
+
+public class SimpleCache<K, V> {
+
+    private final Map<K, ExpirableValue<V>> cache = new HashMap<>();
+    private final long timeoutMs;
+
+    @RequiredArgsConstructor
+    private class ExpirableValue<V> {
+
+        private final V value;
+        private final Consumer<V> expireCallback;
+        private long deadlineMs;
+
+        boolean tryExpire() {
+            if (System.currentTimeMillis() >= deadlineMs) {
+                expireCallback.accept(value);
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        void updateDeadline() {
+            deadlineMs = System.currentTimeMillis() + timeoutMs;
+        }
+    }
+
+    public SimpleCache(final ScheduledExecutorService scheduler, final long 
timeoutMs, final long frequencyMs) {
+        this.timeoutMs = timeoutMs;
+        scheduler.scheduleAtFixedRate(() -> {
+            synchronized (SimpleCache.this) {
+                final var keys = new HashSet<K>();
+                cache.forEach((key, value) -> {
+                    if (value.tryExpire()) {
+                        keys.add(key);
+                    }
+                });
+                cache.keySet().removeAll(keys);
+            }
+        }, frequencyMs, frequencyMs, TimeUnit.MILLISECONDS);
+    }
+
+    public synchronized V get(final K key, final Supplier<V> valueSupplier, 
final Consumer<V> expireCallback) {
+        final var value = cache.get(key);
+        if (value != null) {
+            value.updateDeadline();
+            return value.value;
+        }
+
+        final var newValue = new ExpirableValue<>(valueSupplier.get(), 
expireCallback);
+        newValue.updateDeadline();
+        cache.put(key, newValue);
+        return newValue.value;
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index e4240bce700..8ab9d58f570 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -66,6 +66,7 @@ import 
org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.SingleSnapshotAbortedTxnProcessorImpl;
+import org.apache.pulsar.broker.transaction.buffer.impl.TableView;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex;
@@ -90,7 +91,6 @@ import 
org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -582,6 +582,19 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
         reader.close();
     }
 
+    static class MockTableView extends TableView<TransactionBufferSnapshot> {
+
+        public MockTableView(PulsarService pulsar) {
+            super(topic -> 
pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
+                    .createReader(topic), 30000L, pulsar.getExecutor());
+        }
+
+        @Override
+        public SystemTopicClient.Reader<TransactionBufferSnapshot> 
getReader(String topic) {
+            return readerCreator.apply(TopicName.get(topic)).join();
+        }
+    }
+
     @Test(timeOut=30000)
     public void testTransactionBufferRecoverThrowException() throws Exception {
         String topic = NAMESPACE1 + 
"/testTransactionBufferRecoverThrowPulsarClientException";
@@ -612,6 +625,7 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
         doReturn(CompletableFuture.completedFuture(reader))
                 .when(systemTopicTxnBufferSnapshotService).createReader(any());
         
doReturn(refCounterWriter).when(systemTopicTxnBufferSnapshotService).getReferenceWriter(any());
+        doReturn(new 
MockTableView(pulsarServiceList.get(0))).when(systemTopicTxnBufferSnapshotService).getTableView();
         TransactionBufferSnapshotServiceFactory 
transactionBufferSnapshotServiceFactory =
                 mock(TransactionBufferSnapshotServiceFactory.class);
         doReturn(systemTopicTxnBufferSnapshotService)
@@ -663,7 +677,8 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
                                  PersistentTopic originalTopic,
                                  Field field,
                                  Producer<byte[]> producer) throws Exception {
-        field.set(getPulsarServiceList().get(0), 
transactionBufferSnapshotServiceFactory);
+        final var pulsar = getPulsarServiceList().get(0);
+        field.set(pulsar, transactionBufferSnapshotServiceFactory);
 
         // recover again will throw then close topic
         new TopicTransactionBuffer(originalTopic);
@@ -674,7 +689,7 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
             assertTrue((boolean) close.get(originalTopic));
         });
 
-        field.set(getPulsarServiceList().get(0), 
transactionBufferSnapshotServiceFactoryOriginal);
+        field.set(pulsar, transactionBufferSnapshotServiceFactoryOriginal);
 
         Transaction txn = pulsarClient.newTransaction()
                 .withTransactionTimeout(5, TimeUnit.SECONDS)
@@ -684,29 +699,11 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
         txn.commit().get();
     }
 
-
-    @Test
-    public void testTransactionBufferNoSnapshotCloseReader() throws Exception{
-        String topic = NAMESPACE1 + "/test";
-        @Cleanup
-        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).producerName("testTxnTimeOut_producer")
-                .topic(topic).sendTimeout(0, 
TimeUnit.SECONDS).enableBatching(false).create();
-
-        admin.topics().unload(topic);
-
-        // unload success, all readers have been closed except for the 
compaction sub
-        producer.send("test");
-        TopicStats stats = admin.topics().getStats(NAMESPACE1 + "/" + 
TRANSACTION_BUFFER_SNAPSHOT);
-
-        // except for the compaction sub
-        assertEquals(stats.getSubscriptions().size(), 1);
-        assertTrue(stats.getSubscriptions().keySet().contains("__compaction"));
-    }
-
     @Test
     public void testTransactionBufferIndexSystemTopic() throws Exception {
+        final var pulsar = pulsarServiceList.get(0);
         SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes> 
transactionBufferSnapshotIndexService =
-                new 
TransactionBufferSnapshotServiceFactory(pulsarClient).getTxnBufferSnapshotIndexService();
+                new 
TransactionBufferSnapshotServiceFactory(pulsar).getTxnBufferSnapshotIndexService();
 
         SystemTopicClient.Writer<TransactionBufferSnapshotIndexes> 
indexesWriter =
                 transactionBufferSnapshotIndexService.getReferenceWriter(
@@ -766,9 +763,10 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
         BrokerService brokerService = pulsarService.getBrokerService();
 
         // create snapshot segment writer
+        final var pulsar = pulsarServiceList.get(0);
         SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotSegment>
                 transactionBufferSnapshotSegmentService =
-                new 
TransactionBufferSnapshotServiceFactory(pulsarClient).getTxnBufferSnapshotSegmentService();
+                new 
TransactionBufferSnapshotServiceFactory(pulsar).getTxnBufferSnapshotSegmentService();
 
         SystemTopicClient.Writer<TransactionBufferSnapshotSegment>
                 segmentWriter = transactionBufferSnapshotSegmentService
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleCacheTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleCacheTest.java
new file mode 100644
index 00000000000..c590eda1718
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleCacheTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+public class SimpleCacheTest {
+
+    private final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(1);
+
+    @AfterClass
+    public void shutdown() {
+        executor.shutdown();
+    }
+
+    @Test
+    public void testConcurrentUpdate() throws Exception {
+        final var cache = new SimpleCache<Integer, Integer>(executor, 10000L, 
10000L);
+        final var pool = Executors.newFixedThreadPool(2);
+        final var latch = new CountDownLatch(2);
+        for (int i = 0; i < 2; i++) {
+            final var value = i + 100;
+            pool.execute(() -> {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException ignored) {
+                }
+                cache.get(0, () -> value, __ -> {});
+                latch.countDown();
+            });
+        }
+        latch.await();
+        final var value = cache.get(0, () -> -1, __ -> {});
+        Assert.assertTrue(value == 100 || value == 101);
+        pool.shutdown();
+    }
+
+    @Test
+    public void testExpire() throws InterruptedException {
+        final var cache = new SimpleCache<Integer, Integer>(executor, 500L, 5);
+        final var expiredValues = Collections.synchronizedSet(new 
HashSet<Integer>());
+
+        final var allKeys = IntStream.range(0, 
5).boxed().collect(Collectors.toSet());
+        allKeys.forEach(key -> cache.get(key, () -> key + 100, 
expiredValues::add));
+
+        Thread.sleep(400L);
+        final var recentAccessedKey = Set.of(1, 2);
+        recentAccessedKey.forEach(key -> cache.get(key, () -> -1, 
expiredValues::add)); // access these keys
+
+        Thread.sleep(300L);
+        recentAccessedKey.forEach(key -> Assert.assertEquals(key + 100, 
cache.get(key, () -> -1, __ -> {})));
+        allKeys.stream().filter(key -> !recentAccessedKey.contains(key))
+                .forEach(key -> Assert.assertEquals(-1, cache.get(key, () -> 
-1, __ -> {})));
+    }
+}

Reply via email to