This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3c4a1ce026db0602cda1d3c65ce9211e9e88f02b
Author: Cong Zhao <zhaoc...@apache.org>
AuthorDate: Thu Apr 20 09:26:30 2023 +0800

    [improve][broker] Cache LedgerHandle in BookkeeperBucketSnapshotStorage 
(#20117)
    
    (cherry picked from commit d3fa998aa7c0a7a9452079ef2ff05bccf6b273cf)
---
 .../bucket/BookkeeperBucketSnapshotStorage.java    | 52 +++++++++++-----------
 .../BookkeeperBucketSnapshotStorageTest.java       | 43 ++++++++++++++++++
 2 files changed, 69 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index 9c30ccf1c0b..18a4c322f7b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import javax.validation.constraints.NotNull;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
@@ -48,6 +49,8 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
     private final ServiceConfiguration config;
     private BookKeeper bookKeeper;
 
+    private final Map<Long, CompletableFuture<LedgerHandle>> 
ledgerHandleFutureCache = new ConcurrentHashMap<>();
+
     public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
         this.pulsar = pulsar;
         this.config = pulsar.getConfig();
@@ -66,45 +69,30 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
 
     @Override
     public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long 
bucketId) {
-        return openLedger(bucketId).thenCompose(ledgerHandle -> {
-            CompletableFuture<SnapshotMetadata> snapshotFuture =
-                    getLedgerEntry(ledgerHandle, 0, 0)
-                            .thenApply(entryEnumeration -> 
parseSnapshotMetadataEntry(entryEnumeration.nextElement()));
-
-            snapshotFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
-
-            return snapshotFuture;
-        });
+        return getLedgerHandle(bucketId).thenCompose(ledgerHandle -> 
getLedgerEntry(ledgerHandle, 0, 0)
+                .thenApply(entryEnumeration -> 
parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
     }
 
     @Override
     public CompletableFuture<List<SnapshotSegment>> 
getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
                                                                              
long lastSegmentEntryId) {
-        return openLedger(bucketId).thenCompose(ledgerHandle -> {
-            CompletableFuture<List<SnapshotSegment>> parseFuture =
-                    getLedgerEntry(ledgerHandle, firstSegmentEntryId, 
lastSegmentEntryId)
-                            .thenApply(this::parseSnapshotSegmentEntries);
-
-            parseFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
-
-            return parseFuture;
-        });
+        return getLedgerHandle(bucketId).thenCompose(
+                ledgerHandle -> getLedgerEntry(ledgerHandle, 
firstSegmentEntryId, lastSegmentEntryId)
+                        .thenApply(this::parseSnapshotSegmentEntries));
     }
 
     @Override
     public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
-        return openLedger(bucketId).thenCompose(ledgerHandle -> {
-            CompletableFuture<Long> lengthFuture =
-                    
CompletableFuture.completedFuture(ledgerHandle.getLength());
-
-            lengthFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
-
-            return lengthFuture;
-        });
+        return getLedgerHandle(bucketId).thenCompose(
+                ledgerHandle -> 
CompletableFuture.completedFuture(ledgerHandle.getLength()));
     }
 
     @Override
     public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
+        CompletableFuture<LedgerHandle> ledgerHandleFuture = 
ledgerHandleFutureCache.remove(bucketId);
+        if (ledgerHandleFuture != null) {
+            ledgerHandleFuture.whenComplete((lh, ex) -> closeLedger(lh));
+        }
         return deleteLedger(bucketId);
     }
 
@@ -178,6 +166,18 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
         return future;
     }
 
+    private CompletableFuture<LedgerHandle> getLedgerHandle(Long ledgerId) {
+        CompletableFuture<LedgerHandle> ledgerHandleCompletableFuture =
+                ledgerHandleFutureCache.computeIfAbsent(ledgerId, k -> 
openLedger(ledgerId));
+        // remove future of completed exceptionally
+        ledgerHandleCompletableFuture.whenComplete((__, ex) -> {
+            if (ex != null) {
+                ledgerHandleFutureCache.remove(ledgerId, 
ledgerHandleCompletableFuture);
+            }
+        });
+        return ledgerHandleCompletableFuture;
+    }
+
     private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
         final CompletableFuture<LedgerHandle> future = new 
CompletableFuture<>();
         bookKeeper.asyncOpenLedger(
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
index a628b58e10d..7cb6b8d5865 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage;
 import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -204,4 +205,46 @@ public class BookkeeperBucketSnapshotStorageTest extends 
MockedPulsarServiceBase
         Assert.assertTrue(bucketSnapshotLength > 0L);
     }
 
+    @Test
+    public void testConcurrencyGet() throws ExecutionException, 
InterruptedException {
+        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata 
segmentMetadata =
+                
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder()
+                        .setMinScheduleTimestamp(System.currentTimeMillis())
+                        .setMaxScheduleTimestamp(System.currentTimeMillis())
+                        .putDelayedIndexBitMap(100L, ByteString.copyFrom(new 
byte[1])).build();
+
+        DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata 
snapshotMetadata =
+                
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder()
+                        .addMetadataList(segmentMetadata)
+                        .build();
+        List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> 
bucketSnapshotSegments = new ArrayList<>();
+
+        long timeMillis = System.currentTimeMillis();
+        DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
+                
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setLedgerId(100L).setEntryId(10L)
+                        .setTimestamp(timeMillis).build();
+        DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment 
snapshotSegment =
+                
DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.newBuilder().addIndexes(delayedIndex).build();
+        bucketSnapshotSegments.add(snapshotSegment);
+        bucketSnapshotSegments.add(snapshotSegment);
+
+        CompletableFuture<Long> future =
+                bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
+                        bucketSnapshotSegments, UUID.randomUUID().toString(), 
TOPIC_NAME, CURSOR_NAME);
+        Long bucketId = future.get();
+        Assert.assertNotNull(bucketId);
+
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            CompletableFuture<Void> future0 = CompletableFuture.runAsync(() -> 
{
+                List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> 
list =
+                        
bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, 1, 3).join();
+                Assert.assertTrue(list.size() > 0);
+            });
+            futures.add(future0);
+        }
+
+        FutureUtil.waitForAll(futures).join();
+    }
+
 }

Reply via email to