This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7233f0e6616 [improve][offload] Create offload resources lazily (#20775)
7233f0e6616 is described below
commit 7233f0e6616ea54d841be7d17bf2abef4d3827c7
Author: Yong Zhang <[email protected]>
AuthorDate: Thu Jul 13 12:21:14 2023 +0800
[improve][offload] Create offload resources lazily (#20775)
---
.../impl/BlobStoreManagedLedgerOffloader.java | 26 +++++++++++++---------
1 file changed, 16 insertions(+), 10 deletions(-)
diff --git
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index 7a15c414aa8..03898b032b4 100644
---
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -154,11 +154,17 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
config.getProvider().getDriver(), config.getServiceEndpoint(),
config.getBucket(), config.getRegion());
- blobStores.putIfAbsent(config.getBlobStoreLocation(),
config.getBlobStore());
this.offloaderStats = offloaderStats;
log.info("The ledger offloader was created.");
}
+ private BlobStore getBlobStore(BlobStoreLocation blobStoreLocation) {
+ return blobStores.computeIfAbsent(blobStoreLocation, location -> {
+ log.info("Creating blob store for location {}", location);
+ return config.getBlobStore();
+ });
+ }
+
@Override
public String getOffloadDriverName() {
return config.getDriver();
@@ -179,11 +185,11 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
Map<String, String> extraMetadata) {
final String managedLedgerName =
extraMetadata.get(MANAGED_LEDGER_NAME);
final String topicName =
TopicName.fromPersistenceNamingEncoding(managedLedgerName);
- final BlobStore writeBlobStore =
blobStores.get(config.getBlobStoreLocation());
- log.info("offload {} uuid {} extraMetadata {} to {} {}",
readHandle.getId(), uuid, extraMetadata,
- config.getBlobStoreLocation(), writeBlobStore);
CompletableFuture<Void> promise = new CompletableFuture<>();
scheduler.chooseThread(readHandle.getId()).execute(() -> {
+ final BlobStore writeBlobStore =
getBlobStore(config.getBlobStoreLocation());
+ log.info("offload {} uuid {} extraMetadata {} to {} {}",
readHandle.getId(), uuid, extraMetadata,
+ config.getBlobStoreLocation(), writeBlobStore);
if (readHandle.getLength() == 0 || !readHandle.isClosed() ||
readHandle.getLastAddConfirmed() < 0) {
promise.completeExceptionally(
new IllegalArgumentException("An empty or open ledger
should never be offloaded"));
@@ -330,7 +336,7 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
driverMetadata);
log.debug("begin offload with {}:{}", beginLedger, beginEntry);
this.offloadResult = new CompletableFuture<>();
- blobStore = blobStores.get(config.getBlobStoreLocation());
+ blobStore = getBlobStore(config.getBlobStoreLocation());
streamingIndexBuilder = OffloadIndexBlockV2Builder.create();
streamingDataBlockKey = segmentInfo.uuid.toString();
streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
@@ -536,13 +542,13 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata);
String readBucket = bsKey.getBucket();
- BlobStore readBlobstore =
blobStores.get(config.getBlobStoreLocation());
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
String key = DataBlockUtils.dataBlockOffloadKey(ledgerId, uid);
String indexKey = DataBlockUtils.indexBlockOffloadKey(ledgerId, uid);
scheduler.chooseThread(ledgerId).execute(() -> {
try {
+ BlobStore readBlobstore =
getBlobStore(config.getBlobStoreLocation());
promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
readBlobstore,
readBucket, key, indexKey,
@@ -562,7 +568,6 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
Map<String, String>
offloadDriverMetadata) {
BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata);
String readBucket = bsKey.getBucket();
- BlobStore readBlobstore =
blobStores.get(config.getBlobStoreLocation());
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
final List<MLDataFormats.OffloadSegment> offloadSegmentList =
ledgerContext.getOffloadSegmentList();
List<String> keys = Lists.newLinkedList();
@@ -577,6 +582,7 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
scheduler.chooseThread(ledgerId).execute(() -> {
try {
+ BlobStore readBlobstore =
getBlobStore(config.getBlobStoreLocation());
promise.complete(BlobStoreBackedReadHandleImplV2.open(scheduler.chooseThread(ledgerId),
readBlobstore,
readBucket, keys, indexKeys,
@@ -596,11 +602,11 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
Map<String, String>
offloadDriverMetadata) {
BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata);
String readBucket = bsKey.getBucket(offloadDriverMetadata);
- BlobStore readBlobstore =
blobStores.get(config.getBlobStoreLocation());
CompletableFuture<Void> promise = new CompletableFuture<>();
scheduler.chooseThread(ledgerId).execute(() -> {
try {
+ BlobStore readBlobstore =
getBlobStore(config.getBlobStoreLocation());
readBlobstore.removeBlobs(readBucket,
ImmutableList.of(DataBlockUtils.dataBlockOffloadKey(ledgerId, uid),
DataBlockUtils.indexBlockOffloadKey(ledgerId, uid)));
@@ -623,11 +629,11 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
public CompletableFuture<Void> deleteOffloaded(UUID uid, Map<String,
String> offloadDriverMetadata) {
BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata);
String readBucket = bsKey.getBucket(offloadDriverMetadata);
- BlobStore readBlobstore =
blobStores.get(config.getBlobStoreLocation());
CompletableFuture<Void> promise = new CompletableFuture<>();
scheduler.execute(() -> {
try {
+ BlobStore readBlobstore =
getBlobStore(config.getBlobStoreLocation());
readBlobstore.removeBlobs(readBucket,
ImmutableList.of(uid.toString(),
DataBlockUtils.indexBlockOffloadKey(uid)));
@@ -667,7 +673,7 @@ public class BlobStoreManagedLedgerOffloader implements
LedgerOffloader {
String readBucket = bsKey.getBucket();
log.info("Scanning bucket {}, bsKey {}, location {} endpoint{} ",
readBucket, bsKey,
config.getBlobStoreLocation(), endpoint);
- BlobStore readBlobstore =
blobStores.get(config.getBlobStoreLocation());
+ BlobStore readBlobstore = getBlobStore(config.getBlobStoreLocation());
int batchSize = 100;
String bucketName = config.getBucket();
String marker = null;