eolivelli commented on a change in pull request #9096:
URL: https://github.com/apache/pulsar/pull/9096#discussion_r556387333



##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +260,183 @@ public String getOffloadDriverName() {
         return promise;
     }
 
+    BlobStore blobStore;
+    String streamingDataBlockKey;
+    String streamingDataIndexKey;
+    MultipartUpload streamingMpu = null;
+    List<MultipartPart> streamingParts = Lists.newArrayList();
+
+    @Override
+    public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml, 
UUID uuid, long beginLedger,
+                                                             long beginEntry,
+                                                             Map<String, 
String> driverMetadata) {
+        this.ml = ml;
+        this.segmentInfo = new SegmentInfoImpl(uuid, beginLedger, beginEntry, 
config.getDriver(), driverMetadata);
+        log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+        this.offloadResult = new CompletableFuture<>();
+        blobStore = blobStores.get(config.getBlobStoreLocation());
+        streamingIndexBuilder = new StreamingOffloadIndexBlockBuilderImpl();
+        streamingDataBlockKey = segmentInfo.uuid.toString();
+        streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+        BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+        DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+        Blob blob = blobBuilder.build();
+        streamingMpu = blobStore
+                .initiateMultipartUpload(config.getBucket(), 
blob.getMetadata(), new PutOptions());
+
+        scheduler.chooseThread(segmentInfo).execute(() -> {
+            log.info("start offloading segment: {}", segmentInfo);
+            streamingOffloadLoop(1, 0);
+        });
+        scheduler.schedule(this::closeSegment, segmentCloseTime.toMillis(), 
TimeUnit.MILLISECONDS);

Review comment:
       ok, thanks for your clarification

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -85,6 +181,34 @@
                                     UUID uid,
                                     Map<String, String> extraMetadata);
 
+    /**
+     * Begin offload the passed in ledgers to longterm storage, it will finish
+     * when a segment reached it's size or time.
+     * Metadata passed in is for inspection purposes only and should be stored
+     * alongside the segment data.
+     *
+     * When the returned OffloaderHandle.getOffloadResultAsync completes, the 
corresponding
+     * ledgers has been persisted to the
+     * loadterm storage, so it is safe to delete the original copy in 
bookkeeper.
+     *
+     * The uid is used to identify an attempt to offload. The implementation 
should
+     * use this to deterministically generate a unique name for the offloaded 
object.
+     * This uid will be stored in the managed ledger metadata before 
attempting the
+     * call to offload(). If a subsequent or concurrent call to 
streamingOffload() finds
+     * a uid in the metadata, it will attempt to cleanup this attempt with a 
call
+     * to #deleteOffloaded(ReadHandle,UUID). Once the offload attempt 
completes,
+     * the managed ledger will update its metadata again, to record the 
completion,
+     * ensuring that subsequent calls will not attempt to offload the same 
ledger
+     * again.
+     *
+     * @return an OffloaderHandle, which when `completeFuture()` completed, 
denotes that the offload has been successful.
+     */
+    default CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger 
ml, UUID uuid, long beginLedger,
+                                                              long beginEntry,

Review comment:
       works for me, thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to