This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d7f97dd6a7b [improve] [broker] Close temporary open ledger in
BookkeeperBucketSnapshotStorage (#20111)
d7f97dd6a7b is described below
commit d7f97dd6a7b6cf8f063c3c9a17bc6e58eaca1caa
Author: lifepuzzlefun <[email protected]>
AuthorDate: Mon Apr 17 19:32:13 2023 +0800
[improve] [broker] Close temporary open ledger in
BookkeeperBucketSnapshotStorage (#20111)
(cherry picked from commit b50e8802a5224dd68832e263e7046650771a1a4e)
---
.../bucket/BookkeeperBucketSnapshotStorage.java | 33 +++++++++++++++++-----
1 file changed, 26 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index e7d4f9301dd..9c30ccf1c0b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -66,22 +66,41 @@ public class BookkeeperBucketSnapshotStorage implements
BucketSnapshotStorage {
@Override
public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long
bucketId) {
- return openLedger(bucketId).thenCompose(
- ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0).
- thenApply(entryEnumeration ->
parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+ return openLedger(bucketId).thenCompose(ledgerHandle -> {
+ CompletableFuture<SnapshotMetadata> snapshotFuture =
+ getLedgerEntry(ledgerHandle, 0, 0)
+ .thenApply(entryEnumeration ->
parseSnapshotMetadataEntry(entryEnumeration.nextElement()));
+
+ snapshotFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
+
+ return snapshotFuture;
+ });
}
@Override
public CompletableFuture<List<SnapshotSegment>>
getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
long lastSegmentEntryId) {
- return openLedger(bucketId).thenCompose(
- ledgerHandle -> getLedgerEntry(ledgerHandle,
firstSegmentEntryId,
-
lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries));
+ return openLedger(bucketId).thenCompose(ledgerHandle -> {
+ CompletableFuture<List<SnapshotSegment>> parseFuture =
+ getLedgerEntry(ledgerHandle, firstSegmentEntryId,
lastSegmentEntryId)
+ .thenApply(this::parseSnapshotSegmentEntries);
+
+ parseFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
+
+ return parseFuture;
+ });
}
@Override
public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
- return openLedger(bucketId).thenApply(LedgerHandle::getLength);
+ return openLedger(bucketId).thenCompose(ledgerHandle -> {
+ CompletableFuture<Long> lengthFuture =
+
CompletableFuture.completedFuture(ledgerHandle.getLength());
+
+ lengthFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
+
+ return lengthFuture;
+ });
}
@Override