codelipenghui commented on a change in pull request #9096:
URL: https://github.com/apache/pulsar/pull/9096#discussion_r560284528
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +265,202 @@ public String getOffloadDriverName() {
return promise;
}
+ BlobStore blobStore;
+ String streamingDataBlockKey;
+ String streamingDataIndexKey;
+ MultipartUpload streamingMpu = null;
+ List<MultipartPart> streamingParts = Lists.newArrayList();
+
+ @Override
+ public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml,
UUID uuid, long beginLedger,
+ long beginEntry,
+ Map<String,
String> driverMetadata) {
+ this.ml = ml;
+ this.segmentInfo = new SegmentInfoImpl(uuid, beginLedger, beginEntry,
config.getDriver(), driverMetadata);
+ log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+ this.offloadResult = new CompletableFuture<>();
+ blobStore = blobStores.get(config.getBlobStoreLocation());
+ streamingIndexBuilder = StreamingOffloadIndexBlockBuilder.create();
+ streamingDataBlockKey = segmentInfo.uuid.toString();
+ streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+ BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+ DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+ Blob blob = blobBuilder.build();
+ streamingMpu = blobStore
+ .initiateMultipartUpload(config.getBucket(),
blob.getMetadata(), new PutOptions());
+
+ scheduler.chooseThread(segmentInfo).execute(() -> {
+ log.info("start offloading segment: {}", segmentInfo);
+ streamingOffloadLoop(1, 0);
+ });
+ scheduler.schedule(this::closeSegment, maxSegmentCloseTime.toMillis(),
TimeUnit.MILLISECONDS);
+
+ return CompletableFuture.completedFuture(new OffloadHandle() {
+ @Override
+ public boolean canOffer(long size) {
+ return BlobStoreManagedLedgerOffloader.this.canOffer(size);
+ }
+
+ @Override
+ public PositionImpl lastOffered() {
+ return BlobStoreManagedLedgerOffloader.this.lastOffered();
+ }
+
+ @Override
+ public OfferEntryResult offerEntry(Entry entry) {
+ return BlobStoreManagedLedgerOffloader.this.offerEntry(entry);
+ }
+
+ @Override
+ public CompletableFuture<OffloadResult> getOffloadResultAsync() {
+ return
BlobStoreManagedLedgerOffloader.this.getOffloadResultAsync();
+ }
+
+ @Override
+ public boolean close() {
+ return BlobStoreManagedLedgerOffloader.this.closeSegment();
+ }
+ });
+ }
+
+ private void streamingOffloadLoop(int partId, int dataObjectLength) {
+ log.debug("streaming offload loop {} {}", partId, dataObjectLength);
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ offloadResult.complete(segmentInfo.result());
+ return;
+ }
+ final BufferedOffloadStream payloadStream;
+
+ while (offloadBuffer.isEmpty()) {
+ if (segmentInfo.isClosed()) {
+ offloadResult.complete(segmentInfo.result());
+ } else {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ final Entry peek = offloadBuffer.peek();
+ //initialize payload when there is at least one entry
+ final long blockLedgerId = peek.getLedgerId();
+ final long blockEntryId = peek.getEntryId();
+ payloadStream = new BufferedOffloadStream(streamingBlockSize,
offloadBuffer, segmentInfo,
+ blockLedgerId, blockEntryId, bufferLength);
+ try {
+ streamingIndexBuilder.addLedgerMeta(blockLedgerId,
ml.getClosedLedgerInfo(blockLedgerId).get());
+
streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+ } catch (InterruptedException | ExecutionException e) {
+ offloadResult.completeExceptionally(e);
+ return;
+ }
+ log.debug("begin upload payload");
+ Payload partPayload = Payloads.newInputStreamPayload(payloadStream);
+
partPayload.getContentMetadata().setContentType("application/octet-stream");
+ streamingParts.add(blobStore.uploadMultipartPart(streamingMpu, partId,
partPayload));
+ streamingIndexBuilder.addBlock(blockLedgerId, blockEntryId, partId,
streamingBlockSize);
+
+ log.debug("UploadMultipartPart. container: {}, blobName: {}, partId:
{}, mpu: {}",
+ config.getBucket(), streamingDataBlockKey, partId,
streamingMpu.id());
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ log.debug("segment closed, buffer is empty");
+ try {
+ blobStore.completeMultipartUpload(streamingMpu,
streamingParts);
+ streamingIndexBuilder.withDataObjectLength(dataObjectLength +
streamingBlockSize);
+ final StreamingOffloadIndexBlock index =
streamingIndexBuilder.buildStreaming();
+ final IndexInputStream indexStream = index.toStream();
+ final BlobBuilder indexBlobBuilder =
blobStore.blobBuilder(streamingDataIndexKey);
+ DataBlockUtils.addVersionInfo(indexBlobBuilder, userMetadata);
+ final InputStreamPayload indexPayLoad =
Payloads.newInputStreamPayload(indexStream);
+
indexPayLoad.getContentMetadata().setContentLength(indexStream.getStreamSize());
+
indexPayLoad.getContentMetadata().setContentType("application/octet-stream");
+ final Blob indexBlob = indexBlobBuilder.payload(indexPayLoad)
+ .contentLength(indexStream.getStreamSize())
+ .build();
+ blobStore.putBlob(config.getBucket(), indexBlob);
+
+ offloadResult.complete(segmentInfo.result());
+ } catch (Exception e) {
+ log.error("streaming offload failed", e);
+ offloadResult.completeExceptionally(e);
+ }
+ log.debug("offload done");
+ } else {
+ log.debug("continue offload loop");
+ scheduler.chooseThread(segmentInfo)
+ .execute(() -> streamingOffloadLoop(partId + 1,
dataObjectLength + streamingBlockSize));
+ }
+ }
+
+ private CompletableFuture<OffloadResult> getOffloadResultAsync() {
+ return this.offloadResult;
+ }
+
+ private synchronized OfferEntryResult offerEntry(Entry entry) {
+
+ if (segmentInfo.isClosed()) {
+ log.debug("Segment already closed {}", segmentInfo);
+ return OfferEntryResult.FAIL_SEGMENT_CLOSED;
+ } else if (maxBufferLength >= bufferLength.get() + entry.getLength()
+ //if single message size larger than full buffer size, then ok
to offer when buffer is empty
+ && !(entry.getLength() > maxBufferLength &&
offloadBuffer.isEmpty())) {
+ return OfferEntryResult.FAIL_BUFFER_FULL;
+ } else {
+ if (!naiveCheckConsecutive(lastOfferedPosition,
+ PositionImpl.get(entry.getLedgerId(),
entry.getEntryId()))) {
+ log.error("position {} and {} are not consecutive",
lastOfferedPosition,
+ entry.getPosition());
+ return OfferEntryResult.FAIL_NOT_CONSECUTIVE;
+ }
+ final EntryImpl entryImpl = EntryImpl
+ .create(entry.getLedgerId(), entry.getEntryId(),
entry.getDataBuffer());
+ offloadBuffer.add(entryImpl);
+ bufferLength.getAndAdd(entryImpl.getLength());
+ segmentLength.getAndAdd(entryImpl.getLength());
+ lastOfferedPosition = entryImpl.getPosition();
+ if (segmentLength.get() >= maxSegmentLength
+ && System.currentTimeMillis() - segmentBeginTimeMillis >=
minSegmentCloseTimeMillis) {
+ closeSegment();
+ }
+ return OfferEntryResult.SUCCESS;
+ }
+ }
+
+ private synchronized boolean closeSegment() {
+ final boolean result = !segmentInfo.isClosed();
+ log.debug("close segment {} {}", lastOfferedPosition.getLedgerId(),
lastOfferedPosition.getEntryId());
+ this.segmentInfo.closeSegment(lastOfferedPosition.getLedgerId(),
lastOfferedPosition.getEntryId());
+ return result;
+ }
+
+ private static boolean naiveCheckConsecutive(PositionImpl
lastOfferedPosition, PositionImpl offeringPosition) {
+ if (offeringPosition.getLedgerId() == lastOfferedPosition.getLedgerId()
+ && offeringPosition.getEntryId() ==
lastOfferedPosition.getEntryId() + 1) {
+ return true;
+ } else if (offeringPosition.getEntryId() == 0) {
+ return true;
+ } else {
+ // lastOfferedPosition not initialized
+ return lastOfferedPosition.equals(PositionImpl.latest);
+ }
+ }
+
+ private PositionImpl lastOffered() {
+ return lastOfferedPosition;
+ }
+
+ private boolean canOffer(long size) {
+ if (segmentInfo.isClosed()) {
+ return false;
Review comment:
But canOffer return boolean right? If broker call canOffer it return
false here, then the broker need to try to add one more entry to check if the
offload segment closed? Is my understanding correct?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]