Renkai commented on a change in pull request #9096:
URL: https://github.com/apache/pulsar/pull/9096#discussion_r560213581



##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +265,202 @@ public String getOffloadDriverName() {
         return promise;
     }
 
+    BlobStore blobStore;
+    String streamingDataBlockKey;
+    String streamingDataIndexKey;
+    MultipartUpload streamingMpu = null;
+    List<MultipartPart> streamingParts = Lists.newArrayList();
+
+    @Override
+    public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml, 
UUID uuid, long beginLedger,
+                                                             long beginEntry,
+                                                             Map<String, 
String> driverMetadata) {
+        this.ml = ml;
+        this.segmentInfo = new SegmentInfoImpl(uuid, beginLedger, beginEntry, 
config.getDriver(), driverMetadata);
+        log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+        this.offloadResult = new CompletableFuture<>();
+        blobStore = blobStores.get(config.getBlobStoreLocation());
+        streamingIndexBuilder = StreamingOffloadIndexBlockBuilder.create();
+        streamingDataBlockKey = segmentInfo.uuid.toString();
+        streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+        BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+        DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+        Blob blob = blobBuilder.build();
+        streamingMpu = blobStore
+                .initiateMultipartUpload(config.getBucket(), 
blob.getMetadata(), new PutOptions());
+
+        scheduler.chooseThread(segmentInfo).execute(() -> {
+            log.info("start offloading segment: {}", segmentInfo);
+            streamingOffloadLoop(1, 0);
+        });
+        scheduler.schedule(this::closeSegment, maxSegmentCloseTime.toMillis(), 
TimeUnit.MILLISECONDS);
+
+        return CompletableFuture.completedFuture(new OffloadHandle() {
+            @Override
+            public boolean canOffer(long size) {
+                return BlobStoreManagedLedgerOffloader.this.canOffer(size);
+            }
+
+            @Override
+            public PositionImpl lastOffered() {
+                return BlobStoreManagedLedgerOffloader.this.lastOffered();
+            }
+
+            @Override
+            public OfferEntryResult offerEntry(Entry entry) {
+                return BlobStoreManagedLedgerOffloader.this.offerEntry(entry);
+            }
+
+            @Override
+            public CompletableFuture<OffloadResult> getOffloadResultAsync() {
+                return 
BlobStoreManagedLedgerOffloader.this.getOffloadResultAsync();
+            }
+
+            @Override
+            public boolean close() {
+                return BlobStoreManagedLedgerOffloader.this.closeSegment();
+            }
+        });
+    }
+
+    private void streamingOffloadLoop(int partId, int dataObjectLength) {
+        log.debug("streaming offload loop {} {}", partId, dataObjectLength);
+        if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+            offloadResult.complete(segmentInfo.result());
+            return;
+        }
+        final BufferedOffloadStream payloadStream;
+
+        while (offloadBuffer.isEmpty()) {
+            if (segmentInfo.isClosed()) {
+                offloadResult.complete(segmentInfo.result());
+            } else {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        final Entry peek = offloadBuffer.peek();
+        //initialize payload when there is at least one entry
+        final long blockLedgerId = peek.getLedgerId();
+        final long blockEntryId = peek.getEntryId();
+        payloadStream = new BufferedOffloadStream(streamingBlockSize, 
offloadBuffer, segmentInfo,
+                blockLedgerId, blockEntryId, bufferLength);
+        try {
+            streamingIndexBuilder.addLedgerMeta(blockLedgerId, 
ml.getClosedLedgerInfo(blockLedgerId).get());
+            
streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+        } catch (InterruptedException | ExecutionException e) {
+            offloadResult.completeExceptionally(e);
+            return;
+        }
+        log.debug("begin upload payload");
+        Payload partPayload = Payloads.newInputStreamPayload(payloadStream);
+        
partPayload.getContentMetadata().setContentType("application/octet-stream");
+        streamingParts.add(blobStore.uploadMultipartPart(streamingMpu, partId, 
partPayload));
+        streamingIndexBuilder.addBlock(blockLedgerId, blockEntryId, partId, 
streamingBlockSize);
+
+        log.debug("UploadMultipartPart. container: {}, blobName: {}, partId: 
{}, mpu: {}",
+                config.getBucket(), streamingDataBlockKey, partId, 
streamingMpu.id());
+        if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+            log.debug("segment closed, buffer is empty");
+            try {
+                blobStore.completeMultipartUpload(streamingMpu, 
streamingParts);

Review comment:
       > is the `streamingParts` clear automatically after complete the upload? 
I can't find any places to clear the `streamingParts`. BTW, the 
`streamingParts` seems can be a local variable
   
   As we discussed formerly, a new offloader will be created once the segment 
offloaded, so streamingParts will be garbage collected with the offloader.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to