Renkai commented on a change in pull request #9096:
URL: https://github.com/apache/pulsar/pull/9096#discussion_r560246648
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +265,202 @@ public String getOffloadDriverName() {
return promise;
}
+ BlobStore blobStore;
+ String streamingDataBlockKey;
+ String streamingDataIndexKey;
+ MultipartUpload streamingMpu = null;
+ List<MultipartPart> streamingParts = Lists.newArrayList();
+
+ @Override
+ public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml,
UUID uuid, long beginLedger,
+ long beginEntry,
+ Map<String,
String> driverMetadata) {
+ this.ml = ml;
+ this.segmentInfo = new SegmentInfoImpl(uuid, beginLedger, beginEntry,
config.getDriver(), driverMetadata);
+ log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+ this.offloadResult = new CompletableFuture<>();
+ blobStore = blobStores.get(config.getBlobStoreLocation());
+ streamingIndexBuilder = StreamingOffloadIndexBlockBuilder.create();
+ streamingDataBlockKey = segmentInfo.uuid.toString();
+ streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+ BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+ DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+ Blob blob = blobBuilder.build();
+ streamingMpu = blobStore
+ .initiateMultipartUpload(config.getBucket(),
blob.getMetadata(), new PutOptions());
+
+ scheduler.chooseThread(segmentInfo).execute(() -> {
+ log.info("start offloading segment: {}", segmentInfo);
+ streamingOffloadLoop(1, 0);
+ });
+ scheduler.schedule(this::closeSegment, maxSegmentCloseTime.toMillis(),
TimeUnit.MILLISECONDS);
+
+ return CompletableFuture.completedFuture(new OffloadHandle() {
+ @Override
+ public boolean canOffer(long size) {
+ return BlobStoreManagedLedgerOffloader.this.canOffer(size);
+ }
+
+ @Override
+ public PositionImpl lastOffered() {
+ return BlobStoreManagedLedgerOffloader.this.lastOffered();
+ }
+
+ @Override
+ public OfferEntryResult offerEntry(Entry entry) {
+ return BlobStoreManagedLedgerOffloader.this.offerEntry(entry);
+ }
+
+ @Override
+ public CompletableFuture<OffloadResult> getOffloadResultAsync() {
+ return
BlobStoreManagedLedgerOffloader.this.getOffloadResultAsync();
+ }
+
+ @Override
+ public boolean close() {
+ return BlobStoreManagedLedgerOffloader.this.closeSegment();
+ }
+ });
+ }
+
+ private void streamingOffloadLoop(int partId, int dataObjectLength) {
+ log.debug("streaming offload loop {} {}", partId, dataObjectLength);
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ offloadResult.complete(segmentInfo.result());
+ return;
+ }
+ final BufferedOffloadStream payloadStream;
+
+ while (offloadBuffer.isEmpty()) {
+ if (segmentInfo.isClosed()) {
+ offloadResult.complete(segmentInfo.result());
+ } else {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ final Entry peek = offloadBuffer.peek();
+ //initialize payload when there is at least one entry
+ final long blockLedgerId = peek.getLedgerId();
+ final long blockEntryId = peek.getEntryId();
+ payloadStream = new BufferedOffloadStream(streamingBlockSize,
offloadBuffer, segmentInfo,
+ blockLedgerId, blockEntryId, bufferLength);
+ try {
+ streamingIndexBuilder.addLedgerMeta(blockLedgerId,
ml.getClosedLedgerInfo(blockLedgerId).get());
+
streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+ } catch (InterruptedException | ExecutionException e) {
+ offloadResult.completeExceptionally(e);
+ return;
+ }
+ log.debug("begin upload payload");
+ Payload partPayload = Payloads.newInputStreamPayload(payloadStream);
+
partPayload.getContentMetadata().setContentType("application/octet-stream");
+ streamingParts.add(blobStore.uploadMultipartPart(streamingMpu, partId,
partPayload));
+ streamingIndexBuilder.addBlock(blockLedgerId, blockEntryId, partId,
streamingBlockSize);
+
+ log.debug("UploadMultipartPart. container: {}, blobName: {}, partId:
{}, mpu: {}",
+ config.getBucket(), streamingDataBlockKey, partId,
streamingMpu.id());
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ log.debug("segment closed, buffer is empty");
+ try {
+ blobStore.completeMultipartUpload(streamingMpu,
streamingParts);
+ streamingIndexBuilder.withDataObjectLength(dataObjectLength +
streamingBlockSize);
+ final StreamingOffloadIndexBlock index =
streamingIndexBuilder.buildStreaming();
+ final IndexInputStream indexStream = index.toStream();
+ final BlobBuilder indexBlobBuilder =
blobStore.blobBuilder(streamingDataIndexKey);
+ DataBlockUtils.addVersionInfo(indexBlobBuilder, userMetadata);
+ final InputStreamPayload indexPayLoad =
Payloads.newInputStreamPayload(indexStream);
+
indexPayLoad.getContentMetadata().setContentLength(indexStream.getStreamSize());
+
indexPayLoad.getContentMetadata().setContentType("application/octet-stream");
+ final Blob indexBlob = indexBlobBuilder.payload(indexPayLoad)
+ .contentLength(indexStream.getStreamSize())
+ .build();
+ blobStore.putBlob(config.getBucket(), indexBlob);
+
+ offloadResult.complete(segmentInfo.result());
+ } catch (Exception e) {
+ log.error("streaming offload failed", e);
+ offloadResult.completeExceptionally(e);
+ }
+ log.debug("offload done");
+ } else {
+ log.debug("continue offload loop");
+ scheduler.chooseThread(segmentInfo)
+ .execute(() -> streamingOffloadLoop(partId + 1,
dataObjectLength + streamingBlockSize));
Review comment:
I used schedule instead of sleep
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]