zymap commented on a change in pull request #9096:
URL: https://github.com/apache/pulsar/pull/9096#discussion_r560659329
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -1685,6 +1685,14 @@ void asyncReadEntries(OpReadEntry opReadEntry) {
return getLedgerHandle(ledgerId).thenApply(rh ->
rh.getLedgerMetadata().toSafeString());
}
+ @Override
+ public CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId) {
+ CompletableFuture<LedgerInfo> result = new CompletableFuture<>();
+ final LedgerInfo ledgerInfo = ledgers.get(ledgerId);
Review comment:
Do we need to check the returned value? It's might be null.
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -85,6 +150,34 @@
UUID uid,
Map<String, String> extraMetadata);
+ /**
+ * Begin offload the passed in ledgers to longterm storage, it will finish
+ * when a segment reached it's size or time.
+ * Metadata passed in is for inspection purposes only and should be stored
+ * alongside the segment data.
+ *
+ * When the returned OffloaderHandle.getOffloadResultAsync completes, the
corresponding
+ * ledgers has been persisted to the
+ * longterm storage, so it is safe to delete the original copy in
bookkeeper.
+ *
+ * The uid is used to identify an attempt to offload. The implementation
should
+ * use this to deterministically generate a unique name for the offloaded
object.
+ * This uid will be stored in the managed ledger metadata before
attempting the
+ * call to offload(). If a subsequent or concurrent call to
streamingOffload() finds
Review comment:
streamingoffload()?
##########
File path:
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
##########
@@ -97,21 +97,21 @@ public void testOffloadRead() throws Exception {
assertEquals(new String(e.getData()), "entry-" + i++);
}
verify(offloader, times(1))
- .readOffloaded(anyLong(), any(), anyMap());
+ .readOffloaded(anyLong(), (UUID) any(), anyMap());
Review comment:
any() should be any type, why we need to cast it to the UUID?
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +266,213 @@ public String getOffloadDriverName() {
return promise;
}
+ BlobStore blobStore;
+ String streamingDataBlockKey;
+ String streamingDataIndexKey;
+ MultipartUpload streamingMpu = null;
+ List<MultipartPart> streamingParts = Lists.newArrayList();
+
+ @Override
+ public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml,
UUID uuid, long beginLedger,
+ long beginEntry,
+ Map<String,
String> driverMetadata) {
+ this.ml = ml;
+ this.segmentInfo = new OffloadSegmentInfoImpl(uuid, beginLedger,
beginEntry, config.getDriver(),
+ driverMetadata);
+ log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+ this.offloadResult = new CompletableFuture<>();
+ blobStore = blobStores.get(config.getBlobStoreLocation());
+ streamingIndexBuilder = StreamingOffloadIndexBlockBuilder.create();
+ streamingDataBlockKey = segmentInfo.uuid.toString();
+ streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
Review comment:
If we have another call for the `streamingOffloading`, do those
variables will impact the previous offload process? Because the following
offload loop still uses the same variables.
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +266,213 @@ public String getOffloadDriverName() {
return promise;
}
+ BlobStore blobStore;
+ String streamingDataBlockKey;
+ String streamingDataIndexKey;
+ MultipartUpload streamingMpu = null;
+ List<MultipartPart> streamingParts = Lists.newArrayList();
+
+ @Override
+ public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml,
UUID uuid, long beginLedger,
+ long beginEntry,
+ Map<String,
String> driverMetadata) {
+ this.ml = ml;
+ this.segmentInfo = new OffloadSegmentInfoImpl(uuid, beginLedger,
beginEntry, config.getDriver(),
+ driverMetadata);
+ log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+ this.offloadResult = new CompletableFuture<>();
+ blobStore = blobStores.get(config.getBlobStoreLocation());
+ streamingIndexBuilder = StreamingOffloadIndexBlockBuilder.create();
+ streamingDataBlockKey = segmentInfo.uuid.toString();
+ streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+ BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+ DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+ Blob blob = blobBuilder.build();
+ streamingMpu = blobStore
Review comment:
same above.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]