This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7233f0e6616 [improve][offload] Create offload resources lazily (#20775)
7233f0e6616 is described below

commit 7233f0e6616ea54d841be7d17bf2abef4d3827c7
Author: Yong Zhang <[email protected]>
AuthorDate: Thu Jul 13 12:21:14 2023 +0800

    [improve][offload] Create offload resources lazily (#20775)
---
 .../impl/BlobStoreManagedLedgerOffloader.java      | 26 +++++++++++++---------
 1 file changed, 16 insertions(+), 10 deletions(-)

diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index 7a15c414aa8..03898b032b4 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -154,11 +154,17 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
                 config.getProvider().getDriver(), config.getServiceEndpoint(),
                 config.getBucket(), config.getRegion());
 
-        blobStores.putIfAbsent(config.getBlobStoreLocation(), 
config.getBlobStore());
         this.offloaderStats = offloaderStats;
         log.info("The ledger offloader was created.");
     }
 
+    private BlobStore getBlobStore(BlobStoreLocation blobStoreLocation) {
+        return blobStores.computeIfAbsent(blobStoreLocation, location -> {
+            log.info("Creating blob store for location {}", location);
+            return config.getBlobStore();
+        });
+    }
+
     @Override
     public String getOffloadDriverName() {
         return config.getDriver();
@@ -179,11 +185,11 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
                                            Map<String, String> extraMetadata) {
         final String managedLedgerName = 
extraMetadata.get(MANAGED_LEDGER_NAME);
         final String topicName = 
TopicName.fromPersistenceNamingEncoding(managedLedgerName);
-        final BlobStore writeBlobStore = 
blobStores.get(config.getBlobStoreLocation());
-        log.info("offload {} uuid {} extraMetadata {} to {} {}", 
readHandle.getId(), uuid, extraMetadata,
-                config.getBlobStoreLocation(), writeBlobStore);
         CompletableFuture<Void> promise = new CompletableFuture<>();
         scheduler.chooseThread(readHandle.getId()).execute(() -> {
+            final BlobStore writeBlobStore = 
getBlobStore(config.getBlobStoreLocation());
+            log.info("offload {} uuid {} extraMetadata {} to {} {}", 
readHandle.getId(), uuid, extraMetadata,
+                config.getBlobStoreLocation(), writeBlobStore);
             if (readHandle.getLength() == 0 || !readHandle.isClosed() || 
readHandle.getLastAddConfirmed() < 0) {
                 promise.completeExceptionally(
                         new IllegalArgumentException("An empty or open ledger 
should never be offloaded"));
@@ -330,7 +336,7 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
                 driverMetadata);
         log.debug("begin offload with {}:{}", beginLedger, beginEntry);
         this.offloadResult = new CompletableFuture<>();
-        blobStore = blobStores.get(config.getBlobStoreLocation());
+        blobStore = getBlobStore(config.getBlobStoreLocation());
         streamingIndexBuilder = OffloadIndexBlockV2Builder.create();
         streamingDataBlockKey = segmentInfo.uuid.toString();
         streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
@@ -536,13 +542,13 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
 
         BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata);
         String readBucket = bsKey.getBucket();
-        BlobStore readBlobstore = 
blobStores.get(config.getBlobStoreLocation());
 
         CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
         String key = DataBlockUtils.dataBlockOffloadKey(ledgerId, uid);
         String indexKey = DataBlockUtils.indexBlockOffloadKey(ledgerId, uid);
         scheduler.chooseThread(ledgerId).execute(() -> {
             try {
+                BlobStore readBlobstore = 
getBlobStore(config.getBlobStoreLocation());
                 
promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
                         readBlobstore,
                         readBucket, key, indexKey,
@@ -562,7 +568,6 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
                                                        Map<String, String> 
offloadDriverMetadata) {
         BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata);
         String readBucket = bsKey.getBucket();
-        BlobStore readBlobstore = 
blobStores.get(config.getBlobStoreLocation());
         CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
         final List<MLDataFormats.OffloadSegment> offloadSegmentList = 
ledgerContext.getOffloadSegmentList();
         List<String> keys = Lists.newLinkedList();
@@ -577,6 +582,7 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
 
         scheduler.chooseThread(ledgerId).execute(() -> {
             try {
+                BlobStore readBlobstore = 
getBlobStore(config.getBlobStoreLocation());
                 
promise.complete(BlobStoreBackedReadHandleImplV2.open(scheduler.chooseThread(ledgerId),
                         readBlobstore,
                         readBucket, keys, indexKeys,
@@ -596,11 +602,11 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
                                                    Map<String, String> 
offloadDriverMetadata) {
         BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata);
         String readBucket = bsKey.getBucket(offloadDriverMetadata);
-        BlobStore readBlobstore = 
blobStores.get(config.getBlobStoreLocation());
 
         CompletableFuture<Void> promise = new CompletableFuture<>();
         scheduler.chooseThread(ledgerId).execute(() -> {
             try {
+                BlobStore readBlobstore = 
getBlobStore(config.getBlobStoreLocation());
                 readBlobstore.removeBlobs(readBucket,
                     
ImmutableList.of(DataBlockUtils.dataBlockOffloadKey(ledgerId, uid),
                                      
DataBlockUtils.indexBlockOffloadKey(ledgerId, uid)));
@@ -623,11 +629,11 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
     public CompletableFuture<Void> deleteOffloaded(UUID uid, Map<String, 
String> offloadDriverMetadata) {
         BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata);
         String readBucket = bsKey.getBucket(offloadDriverMetadata);
-        BlobStore readBlobstore = 
blobStores.get(config.getBlobStoreLocation());
 
         CompletableFuture<Void> promise = new CompletableFuture<>();
         scheduler.execute(() -> {
             try {
+                BlobStore readBlobstore = 
getBlobStore(config.getBlobStoreLocation());
                 readBlobstore.removeBlobs(readBucket,
                         ImmutableList.of(uid.toString(),
                                 DataBlockUtils.indexBlockOffloadKey(uid)));
@@ -667,7 +673,7 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
         String readBucket = bsKey.getBucket();
         log.info("Scanning bucket {}, bsKey {}, location {} endpoint{} ", 
readBucket, bsKey,
                 config.getBlobStoreLocation(), endpoint);
-        BlobStore readBlobstore = 
blobStores.get(config.getBlobStoreLocation());
+        BlobStore readBlobstore = getBlobStore(config.getBlobStoreLocation());
         int batchSize = 100;
         String bucketName = config.getBucket();
         String marker = null;

Reply via email to