zymap commented on a change in pull request #9096:
URL: https://github.com/apache/pulsar/pull/9096#discussion_r560659329



##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -1685,6 +1685,14 @@ void asyncReadEntries(OpReadEntry opReadEntry) {
         return getLedgerHandle(ledgerId).thenApply(rh -> 
rh.getLedgerMetadata().toSafeString());
     }
 
+    @Override
+    public CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId) {
+        CompletableFuture<LedgerInfo> result = new CompletableFuture<>();
+        final LedgerInfo ledgerInfo = ledgers.get(ledgerId);

Review comment:
       Do we need to check the returned value? It's might be null.

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -85,6 +150,34 @@
                                     UUID uid,
                                     Map<String, String> extraMetadata);
 
+    /**
+     * Begin offload the passed in ledgers to longterm storage, it will finish
+     * when a segment reached it's size or time.
+     * Metadata passed in is for inspection purposes only and should be stored
+     * alongside the segment data.
+     *
+     * When the returned OffloaderHandle.getOffloadResultAsync completes, the 
corresponding
+     * ledgers has been persisted to the
+     * longterm storage, so it is safe to delete the original copy in 
bookkeeper.
+     *
+     * The uid is used to identify an attempt to offload. The implementation 
should
+     * use this to deterministically generate a unique name for the offloaded 
object.
+     * This uid will be stored in the managed ledger metadata before 
attempting the
+     * call to offload(). If a subsequent or concurrent call to 
streamingOffload() finds

Review comment:
       streamingoffload()?

##########
File path: 
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
##########
@@ -97,21 +97,21 @@ public void testOffloadRead() throws Exception {
             assertEquals(new String(e.getData()), "entry-" + i++);
         }
         verify(offloader, times(1))
-            .readOffloaded(anyLong(), any(), anyMap());
+                .readOffloaded(anyLong(), (UUID) any(), anyMap());

Review comment:
       any() should be any type, why we need to cast it to the UUID?

##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +266,213 @@ public String getOffloadDriverName() {
         return promise;
     }
 
+    BlobStore blobStore;
+    String streamingDataBlockKey;
+    String streamingDataIndexKey;
+    MultipartUpload streamingMpu = null;
+    List<MultipartPart> streamingParts = Lists.newArrayList();
+
+    @Override
+    public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml, 
UUID uuid, long beginLedger,
+                                                             long beginEntry,
+                                                             Map<String, 
String> driverMetadata) {
+        this.ml = ml;
+        this.segmentInfo = new OffloadSegmentInfoImpl(uuid, beginLedger, 
beginEntry, config.getDriver(),
+                driverMetadata);
+        log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+        this.offloadResult = new CompletableFuture<>();
+        blobStore = blobStores.get(config.getBlobStoreLocation());
+        streamingIndexBuilder = StreamingOffloadIndexBlockBuilder.create();
+        streamingDataBlockKey = segmentInfo.uuid.toString();
+        streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);

Review comment:
       If we have another call for the `streamingOffloading`, do those 
variables will impact the previous offload process? Because the following 
offload loop still uses the same variables.

##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +266,213 @@ public String getOffloadDriverName() {
         return promise;
     }
 
+    BlobStore blobStore;
+    String streamingDataBlockKey;
+    String streamingDataIndexKey;
+    MultipartUpload streamingMpu = null;
+    List<MultipartPart> streamingParts = Lists.newArrayList();
+
+    @Override
+    public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml, 
UUID uuid, long beginLedger,
+                                                             long beginEntry,
+                                                             Map<String, 
String> driverMetadata) {
+        this.ml = ml;
+        this.segmentInfo = new OffloadSegmentInfoImpl(uuid, beginLedger, 
beginEntry, config.getDriver(),
+                driverMetadata);
+        log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+        this.offloadResult = new CompletableFuture<>();
+        blobStore = blobStores.get(config.getBlobStoreLocation());
+        streamingIndexBuilder = StreamingOffloadIndexBlockBuilder.create();
+        streamingDataBlockKey = segmentInfo.uuid.toString();
+        streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+        BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+        DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+        Blob blob = blobBuilder.build();
+        streamingMpu = blobStore

Review comment:
       same above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to