This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3c4a1ce026db0602cda1d3c65ce9211e9e88f02b Author: Cong Zhao <zhaoc...@apache.org> AuthorDate: Thu Apr 20 09:26:30 2023 +0800 [improve][broker] Cache LedgerHandle in BookkeeperBucketSnapshotStorage (#20117) (cherry picked from commit d3fa998aa7c0a7a9452079ef2ff05bccf6b273cf) --- .../bucket/BookkeeperBucketSnapshotStorage.java | 52 +++++++++++----------- .../BookkeeperBucketSnapshotStorageTest.java | 43 ++++++++++++++++++ 2 files changed, 69 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java index 9c30ccf1c0b..18a4c322f7b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import javax.validation.constraints.NotNull; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException; @@ -48,6 +49,8 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage { private final ServiceConfiguration config; private BookKeeper bookKeeper; + private final Map<Long, CompletableFuture<LedgerHandle>> ledgerHandleFutureCache = new ConcurrentHashMap<>(); + public BookkeeperBucketSnapshotStorage(PulsarService pulsar) { this.pulsar = pulsar; this.config = pulsar.getConfig(); @@ -66,45 +69,30 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage { @Override public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) { - return openLedger(bucketId).thenCompose(ledgerHandle -> { - CompletableFuture<SnapshotMetadata> snapshotFuture = - getLedgerEntry(ledgerHandle, 0, 0) - .thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())); - - snapshotFuture.whenComplete((__, e) -> closeLedger(ledgerHandle)); - - return snapshotFuture; - }); + return getLedgerHandle(bucketId).thenCompose(ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0) + .thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement()))); } @Override public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId, long lastSegmentEntryId) { - return openLedger(bucketId).thenCompose(ledgerHandle -> { - CompletableFuture<List<SnapshotSegment>> parseFuture = - getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId) - .thenApply(this::parseSnapshotSegmentEntries); - - parseFuture.whenComplete((__, e) -> closeLedger(ledgerHandle)); - - return parseFuture; - }); + return getLedgerHandle(bucketId).thenCompose( + ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId) + .thenApply(this::parseSnapshotSegmentEntries)); } @Override public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) { - return openLedger(bucketId).thenCompose(ledgerHandle -> { - CompletableFuture<Long> lengthFuture = - CompletableFuture.completedFuture(ledgerHandle.getLength()); - - lengthFuture.whenComplete((__, e) -> closeLedger(ledgerHandle)); - - return lengthFuture; - }); + return getLedgerHandle(bucketId).thenCompose( + ledgerHandle -> CompletableFuture.completedFuture(ledgerHandle.getLength())); } @Override public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) { + CompletableFuture<LedgerHandle> ledgerHandleFuture = ledgerHandleFutureCache.remove(bucketId); + if (ledgerHandleFuture != null) { + ledgerHandleFuture.whenComplete((lh, ex) -> closeLedger(lh)); + } return deleteLedger(bucketId); } @@ -178,6 +166,18 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage { return future; } + private CompletableFuture<LedgerHandle> getLedgerHandle(Long ledgerId) { + CompletableFuture<LedgerHandle> ledgerHandleCompletableFuture = + ledgerHandleFutureCache.computeIfAbsent(ledgerId, k -> openLedger(ledgerId)); + // remove future of completed exceptionally + ledgerHandleCompletableFuture.whenComplete((__, ex) -> { + if (ex != null) { + ledgerHandleFutureCache.remove(ledgerId, ledgerHandleCompletableFuture); + } + }); + return ledgerHandleCompletableFuture; + } + private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) { final CompletableFuture<LedgerHandle> future = new CompletableFuture<>(); bookKeeper.asyncOpenLedger( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java index a628b58e10d..7cb6b8d5865 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat; +import org.apache.pulsar.common.util.FutureUtil; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -204,4 +205,46 @@ public class BookkeeperBucketSnapshotStorageTest extends MockedPulsarServiceBase Assert.assertTrue(bucketSnapshotLength > 0L); } + @Test + public void testConcurrencyGet() throws ExecutionException, InterruptedException { + DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata segmentMetadata = + DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata.newBuilder() + .setMinScheduleTimestamp(System.currentTimeMillis()) + .setMaxScheduleTimestamp(System.currentTimeMillis()) + .putDelayedIndexBitMap(100L, ByteString.copyFrom(new byte[1])).build(); + + DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata = + DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder() + .addMetadataList(segmentMetadata) + .build(); + List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>(); + + long timeMillis = System.currentTimeMillis(); + DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex = + DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setLedgerId(100L).setEntryId(10L) + .setTimestamp(timeMillis).build(); + DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment = + DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment.newBuilder().addIndexes(delayedIndex).build(); + bucketSnapshotSegments.add(snapshotSegment); + bucketSnapshotSegments.add(snapshotSegment); + + CompletableFuture<Long> future = + bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, + bucketSnapshotSegments, UUID.randomUUID().toString(), TOPIC_NAME, CURSOR_NAME); + Long bucketId = future.get(); + Assert.assertNotNull(bucketId); + + List<CompletableFuture<Void>> futures = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + CompletableFuture<Void> future0 = CompletableFuture.runAsync(() -> { + List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> list = + bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, 1, 3).join(); + Assert.assertTrue(list.size() > 0); + }); + futures.add(future0); + } + + FutureUtil.waitForAll(futures).join(); + } + }