This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d7f97dd6a7b [improve] [broker] Close temporary open ledger in 
BookkeeperBucketSnapshotStorage (#20111)
d7f97dd6a7b is described below

commit d7f97dd6a7b6cf8f063c3c9a17bc6e58eaca1caa
Author: lifepuzzlefun <[email protected]>
AuthorDate: Mon Apr 17 19:32:13 2023 +0800

    [improve] [broker] Close temporary open ledger in 
BookkeeperBucketSnapshotStorage (#20111)
    
    (cherry picked from commit b50e8802a5224dd68832e263e7046650771a1a4e)
---
 .../bucket/BookkeeperBucketSnapshotStorage.java    | 33 +++++++++++++++++-----
 1 file changed, 26 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index e7d4f9301dd..9c30ccf1c0b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -66,22 +66,41 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
 
     @Override
     public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long 
bucketId) {
-        return openLedger(bucketId).thenCompose(
-                ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0).
-                        thenApply(entryEnumeration -> 
parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
+        return openLedger(bucketId).thenCompose(ledgerHandle -> {
+            CompletableFuture<SnapshotMetadata> snapshotFuture =
+                    getLedgerEntry(ledgerHandle, 0, 0)
+                            .thenApply(entryEnumeration -> 
parseSnapshotMetadataEntry(entryEnumeration.nextElement()));
+
+            snapshotFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
+
+            return snapshotFuture;
+        });
     }
 
     @Override
     public CompletableFuture<List<SnapshotSegment>> 
getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
                                                                              
long lastSegmentEntryId) {
-        return openLedger(bucketId).thenCompose(
-                ledgerHandle -> getLedgerEntry(ledgerHandle, 
firstSegmentEntryId,
-                        
lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries));
+        return openLedger(bucketId).thenCompose(ledgerHandle -> {
+            CompletableFuture<List<SnapshotSegment>> parseFuture =
+                    getLedgerEntry(ledgerHandle, firstSegmentEntryId, 
lastSegmentEntryId)
+                            .thenApply(this::parseSnapshotSegmentEntries);
+
+            parseFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
+
+            return parseFuture;
+        });
     }
 
     @Override
     public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
-        return openLedger(bucketId).thenApply(LedgerHandle::getLength);
+        return openLedger(bucketId).thenCompose(ledgerHandle -> {
+            CompletableFuture<Long> lengthFuture =
+                    
CompletableFuture.completedFuture(ledgerHandle.getLength());
+
+            lengthFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));
+
+            return lengthFuture;
+        });
     }
 
     @Override

Reply via email to