codelipenghui commented on a change in pull request #9096:
URL: https://github.com/apache/pulsar/pull/9096#discussion_r560286228



##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +265,202 @@ public String getOffloadDriverName() {
         return promise;
     }
 
+    BlobStore blobStore;
+    String streamingDataBlockKey;
+    String streamingDataIndexKey;
+    MultipartUpload streamingMpu = null;
+    List<MultipartPart> streamingParts = Lists.newArrayList();
+
+    @Override
+    public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml, 
UUID uuid, long beginLedger,
+                                                             long beginEntry,
+                                                             Map<String, 
String> driverMetadata) {
+        this.ml = ml;
+        this.segmentInfo = new SegmentInfoImpl(uuid, beginLedger, beginEntry, 
config.getDriver(), driverMetadata);
+        log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+        this.offloadResult = new CompletableFuture<>();
+        blobStore = blobStores.get(config.getBlobStoreLocation());
+        streamingIndexBuilder = StreamingOffloadIndexBlockBuilder.create();
+        streamingDataBlockKey = segmentInfo.uuid.toString();
+        streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+        BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+        DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+        Blob blob = blobBuilder.build();
+        streamingMpu = blobStore
+                .initiateMultipartUpload(config.getBucket(), 
blob.getMetadata(), new PutOptions());
+
+        scheduler.chooseThread(segmentInfo).execute(() -> {
+            log.info("start offloading segment: {}", segmentInfo);
+            streamingOffloadLoop(1, 0);
+        });
+        scheduler.schedule(this::closeSegment, maxSegmentCloseTime.toMillis(), 
TimeUnit.MILLISECONDS);
+
+        return CompletableFuture.completedFuture(new OffloadHandle() {
+            @Override
+            public boolean canOffer(long size) {
+                return BlobStoreManagedLedgerOffloader.this.canOffer(size);
+            }
+
+            @Override
+            public PositionImpl lastOffered() {
+                return BlobStoreManagedLedgerOffloader.this.lastOffered();
+            }
+
+            @Override
+            public OfferEntryResult offerEntry(Entry entry) {
+                return BlobStoreManagedLedgerOffloader.this.offerEntry(entry);
+            }
+
+            @Override
+            public CompletableFuture<OffloadResult> getOffloadResultAsync() {
+                return 
BlobStoreManagedLedgerOffloader.this.getOffloadResultAsync();
+            }
+
+            @Override
+            public boolean close() {
+                return BlobStoreManagedLedgerOffloader.this.closeSegment();
+            }
+        });
+    }
+
+    private void streamingOffloadLoop(int partId, int dataObjectLength) {
+        log.debug("streaming offload loop {} {}", partId, dataObjectLength);
+        if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+            offloadResult.complete(segmentInfo.result());
+            return;
+        }
+        final BufferedOffloadStream payloadStream;
+
+        while (offloadBuffer.isEmpty()) {
+            if (segmentInfo.isClosed()) {
+                offloadResult.complete(segmentInfo.result());
+            } else {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        final Entry peek = offloadBuffer.peek();
+        //initialize payload when there is at least one entry
+        final long blockLedgerId = peek.getLedgerId();
+        final long blockEntryId = peek.getEntryId();
+        payloadStream = new BufferedOffloadStream(streamingBlockSize, 
offloadBuffer, segmentInfo,
+                blockLedgerId, blockEntryId, bufferLength);
+        try {
+            streamingIndexBuilder.addLedgerMeta(blockLedgerId, 
ml.getClosedLedgerInfo(blockLedgerId).get());
+            
streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+        } catch (InterruptedException | ExecutionException e) {
+            offloadResult.completeExceptionally(e);
+            return;
+        }
+        log.debug("begin upload payload");
+        Payload partPayload = Payloads.newInputStreamPayload(payloadStream);
+        
partPayload.getContentMetadata().setContentType("application/octet-stream");
+        streamingParts.add(blobStore.uploadMultipartPart(streamingMpu, partId, 
partPayload));
+        streamingIndexBuilder.addBlock(blockLedgerId, blockEntryId, partId, 
streamingBlockSize);
+
+        log.debug("UploadMultipartPart. container: {}, blobName: {}, partId: 
{}, mpu: {}",
+                config.getBucket(), streamingDataBlockKey, partId, 
streamingMpu.id());
+        if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+            log.debug("segment closed, buffer is empty");
+            try {
+                blobStore.completeMultipartUpload(streamingMpu, 
streamingParts);

Review comment:
       If the streamingParts does not clear, for the next offload loop, how to 
avoid offload the duplicate data?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to