eolivelli commented on a change in pull request #9096:
URL: https://github.com/apache/pulsar/pull/9096#discussion_r555100688
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -85,6 +181,34 @@
UUID uid,
Map<String, String> extraMetadata);
+ /**
+ * Begin offload the passed in ledgers to longterm storage, it will finish
+ * when a segment reached it's size or time.
+ * Metadata passed in is for inspection purposes only and should be stored
+ * alongside the segment data.
+ *
+ * When the returned OffloaderHandle.getOffloadResultAsync completes, the
corresponding
+ * ledgers has been persisted to the
+ * loadterm storage, so it is safe to delete the original copy in
bookkeeper.
+ *
+ * The uid is used to identify an attempt to offload. The implementation
should
Review comment:
typo: uid
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -85,6 +181,34 @@
UUID uid,
Map<String, String> extraMetadata);
+ /**
+ * Begin offload the passed in ledgers to longterm storage, it will finish
+ * when a segment reached it's size or time.
+ * Metadata passed in is for inspection purposes only and should be stored
+ * alongside the segment data.
+ *
+ * When the returned OffloaderHandle.getOffloadResultAsync completes, the
corresponding
+ * ledgers has been persisted to the
+ * loadterm storage, so it is safe to delete the original copy in
bookkeeper.
+ *
+ * The uid is used to identify an attempt to offload. The implementation
should
+ * use this to deterministically generate a unique name for the offloaded
object.
+ * This uid will be stored in the managed ledger metadata before
attempting the
+ * call to offload(). If a subsequent or concurrent call to
streamingOffload() finds
+ * a uid in the metadata, it will attempt to cleanup this attempt with a
call
+ * to #deleteOffloaded(ReadHandle,UUID). Once the offload attempt
completes,
+ * the managed ledger will update its metadata again, to record the
completion,
+ * ensuring that subsequent calls will not attempt to offload the same
ledger
+ * again.
+ *
+ * @return an OffloaderHandle, which when `completeFuture()` completed,
denotes that the offload has been successful.
+ */
+ default CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger
ml, UUID uuid, long beginLedger,
+ long beginEntry,
Review comment:
what happens if I call this method for the same ManagedLedger more
times, with overlapping intervals ?
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +260,183 @@ public String getOffloadDriverName() {
return promise;
}
+ BlobStore blobStore;
+ String streamingDataBlockKey;
+ String streamingDataIndexKey;
+ MultipartUpload streamingMpu = null;
+ List<MultipartPart> streamingParts = Lists.newArrayList();
+
+ @Override
+ public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml,
UUID uuid, long beginLedger,
+ long beginEntry,
+ Map<String,
String> driverMetadata) {
+ this.ml = ml;
+ this.segmentInfo = new SegmentInfoImpl(uuid, beginLedger, beginEntry,
config.getDriver(), driverMetadata);
+ log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+ this.offloadResult = new CompletableFuture<>();
+ blobStore = blobStores.get(config.getBlobStoreLocation());
+ streamingIndexBuilder = new StreamingOffloadIndexBlockBuilderImpl();
+ streamingDataBlockKey = segmentInfo.uuid.toString();
+ streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+ BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+ DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+ Blob blob = blobBuilder.build();
+ streamingMpu = blobStore
+ .initiateMultipartUpload(config.getBucket(),
blob.getMetadata(), new PutOptions());
+
+ scheduler.chooseThread(segmentInfo).execute(() -> {
+ log.info("start offloading segment: {}", segmentInfo);
+ streamingOffloadLoop(1, 0);
+ });
+ scheduler.schedule(this::closeSegment, segmentCloseTime.toMillis(),
TimeUnit.MILLISECONDS);
+
+ return CompletableFuture.completedFuture(new OffloadHandle() {
+ @Override
+ public boolean canOffer(long size) {
+ return BlobStoreManagedLedgerOffloader.this.canOffer(size);
+ }
+
+ @Override
+ public PositionImpl lastOffered() {
+ return BlobStoreManagedLedgerOffloader.this.lastOffered();
+ }
+
+ @Override
+ public boolean offerEntry(EntryImpl entry) throws
OffloadSegmentClosedException,
+ OffloadNotConsecutiveException {
+ return BlobStoreManagedLedgerOffloader.this.offerEntry(entry);
+ }
+
+ @Override
+ public CompletableFuture<OffloadResult> getOffloadResultAsync() {
+ return
BlobStoreManagedLedgerOffloader.this.getOffloadResultAsync();
+ }
+ });
+ }
+
+ private void streamingOffloadLoop(int partId, int dataObjectLength) {
+ log.debug("streaming offload loop {} {}", partId, dataObjectLength);
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ offloadResult.complete(segmentInfo.result());
+ return;
+ }
+ final BufferedOffloadStream payloadStream;
+
+ while (offloadBuffer.isEmpty()) {
+ if (segmentInfo.isClosed()) {
+ offloadResult.complete(segmentInfo.result());
+ } else {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ final Entry peek = offloadBuffer.peek();
+ //initialize payload when there is at least one entry
+ final long blockLedgerId = peek.getLedgerId();
+ final long blockEntryId = peek.getEntryId();
+ payloadStream = new BufferedOffloadStream(streamingBlockSize,
offloadBuffer, segmentInfo,
+ blockLedgerId, blockEntryId, bufferLength);
+ try {
+ streamingIndexBuilder.addLedgerMeta(blockLedgerId,
ml.getRawLedgerMetadata(blockLedgerId).get());
+
streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+ } catch (InterruptedException | ExecutionException e) {
+ offloadResult.completeExceptionally(e);
+ return;
+ }
+ log.debug("begin upload payload");
+ Payload partPayload = Payloads.newInputStreamPayload(payloadStream);
+
partPayload.getContentMetadata().setContentType("application/octet-stream");
+ streamingParts.add(blobStore.uploadMultipartPart(streamingMpu, partId,
partPayload));
+ streamingIndexBuilder.addBlock(blockLedgerId, blockEntryId, partId,
streamingBlockSize);
+
+ log.debug("UploadMultipartPart. container: {}, blobName: {}, partId:
{}, mpu: {}",
+ config.getBucket(), streamingDataBlockKey, partId,
streamingMpu.id());
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ try {
+ blobStore.completeMultipartUpload(streamingMpu,
streamingParts);
+ streamingIndexBuilder.withDataObjectLength(dataObjectLength +
streamingBlockSize);
+ final StreamingOffloadIndexBlock index =
streamingIndexBuilder.build();
+ final StreamingOffloadIndexBlock.IndexInputStream indexStream
= index.toStream();
+ final BlobBuilder indexBlobBuilder =
blobStore.blobBuilder(streamingDataIndexKey);
+ DataBlockUtils.addVersionInfo(indexBlobBuilder, userMetadata);
+ final InputStreamPayload indexPayLoad =
Payloads.newInputStreamPayload(indexStream);
+
indexPayLoad.getContentMetadata().setContentLength(indexStream.getStreamSize());
+
indexPayLoad.getContentMetadata().setContentType("application/octet-stream");
+ final Blob indexBlob = indexBlobBuilder.payload(indexPayLoad)
+ .contentLength(indexStream.getStreamSize())
+ .build();
+ blobStore.putBlob(config.getBucket(), indexBlob);
+
+ offloadResult.complete(segmentInfo.result());
+ } catch (Exception e) {
+ log.error("streaming offload failed", e);
+ offloadResult.completeExceptionally(e);
+ }
+ } else {
+ scheduler.chooseThread(segmentInfo)
+ .execute(() -> streamingOffloadLoop(partId + 1,
dataObjectLength + streamingBlockSize));
+ }
+ }
+
+ private CompletableFuture<OffloadResult> getOffloadResultAsync() {
+ return this.offloadResult;
+ }
+
+ private synchronized boolean offerEntry(EntryImpl entry) throws
OffloadSegmentClosedException,
+ OffloadNotConsecutiveException {
+ if (segmentInfo.isClosed()) {
+ throw new OffloadSegmentClosedException("Segment already closed "
+ segmentInfo);
+ } else {
+ if (!naiveCheckConsecutive(lastOfferedPosition,
entry.getPosition())) {
+ throw new OffloadNotConsecutiveException(
+ Strings.lenientFormat("position %s and %s are not
consecutive", lastOfferedPosition,
+ entry.getPosition()));
+ }
+ entry.retain();
+ offloadBuffer.add(entry);
+ bufferLength.getAndAdd(entry.getLength());
+ segmentLength.getAndAdd(entry.getLength());
+ lastOfferedPosition = entry.getPosition();
+ if (segmentLength.get() >= maxSegmentLength) {
+ closeSegment();
+ }
+ return true;
+ }
+ }
+
+ private synchronized void closeSegment() {
+ log.debug("close segment {} {}", lastOfferedPosition.getLedgerId(),
lastOfferedPosition.getEntryId());
+ this.segmentInfo.closeSegment(lastOfferedPosition.getLedgerId(),
lastOfferedPosition.getEntryId());
+ }
+
+ private boolean naiveCheckConsecutive(PositionImpl lastOfferedPosition,
PositionImpl offeringPosition) {
Review comment:
nit: static ?
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +260,183 @@ public String getOffloadDriverName() {
return promise;
}
+ BlobStore blobStore;
+ String streamingDataBlockKey;
+ String streamingDataIndexKey;
+ MultipartUpload streamingMpu = null;
+ List<MultipartPart> streamingParts = Lists.newArrayList();
+
+ @Override
+ public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml,
UUID uuid, long beginLedger,
+ long beginEntry,
+ Map<String,
String> driverMetadata) {
+ this.ml = ml;
+ this.segmentInfo = new SegmentInfoImpl(uuid, beginLedger, beginEntry,
config.getDriver(), driverMetadata);
+ log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+ this.offloadResult = new CompletableFuture<>();
+ blobStore = blobStores.get(config.getBlobStoreLocation());
+ streamingIndexBuilder = new StreamingOffloadIndexBlockBuilderImpl();
+ streamingDataBlockKey = segmentInfo.uuid.toString();
+ streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+ BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+ DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+ Blob blob = blobBuilder.build();
+ streamingMpu = blobStore
+ .initiateMultipartUpload(config.getBucket(),
blob.getMetadata(), new PutOptions());
+
+ scheduler.chooseThread(segmentInfo).execute(() -> {
+ log.info("start offloading segment: {}", segmentInfo);
+ streamingOffloadLoop(1, 0);
+ });
+ scheduler.schedule(this::closeSegment, segmentCloseTime.toMillis(),
TimeUnit.MILLISECONDS);
Review comment:
should not we perform this operation only after the end of the loop
above ?
what happens if the streamingOffloadLoop does not complete in time ?
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +260,183 @@ public String getOffloadDriverName() {
return promise;
}
+ BlobStore blobStore;
+ String streamingDataBlockKey;
+ String streamingDataIndexKey;
+ MultipartUpload streamingMpu = null;
+ List<MultipartPart> streamingParts = Lists.newArrayList();
+
+ @Override
+ public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml,
UUID uuid, long beginLedger,
+ long beginEntry,
+ Map<String,
String> driverMetadata) {
+ this.ml = ml;
+ this.segmentInfo = new SegmentInfoImpl(uuid, beginLedger, beginEntry,
config.getDriver(), driverMetadata);
+ log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+ this.offloadResult = new CompletableFuture<>();
+ blobStore = blobStores.get(config.getBlobStoreLocation());
+ streamingIndexBuilder = new StreamingOffloadIndexBlockBuilderImpl();
+ streamingDataBlockKey = segmentInfo.uuid.toString();
+ streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+ BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+ DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+ Blob blob = blobBuilder.build();
+ streamingMpu = blobStore
+ .initiateMultipartUpload(config.getBucket(),
blob.getMetadata(), new PutOptions());
+
+ scheduler.chooseThread(segmentInfo).execute(() -> {
+ log.info("start offloading segment: {}", segmentInfo);
+ streamingOffloadLoop(1, 0);
+ });
+ scheduler.schedule(this::closeSegment, segmentCloseTime.toMillis(),
TimeUnit.MILLISECONDS);
+
+ return CompletableFuture.completedFuture(new OffloadHandle() {
+ @Override
+ public boolean canOffer(long size) {
+ return BlobStoreManagedLedgerOffloader.this.canOffer(size);
+ }
+
+ @Override
+ public PositionImpl lastOffered() {
+ return BlobStoreManagedLedgerOffloader.this.lastOffered();
+ }
+
+ @Override
+ public boolean offerEntry(EntryImpl entry) throws
OffloadSegmentClosedException,
+ OffloadNotConsecutiveException {
+ return BlobStoreManagedLedgerOffloader.this.offerEntry(entry);
+ }
+
+ @Override
+ public CompletableFuture<OffloadResult> getOffloadResultAsync() {
+ return
BlobStoreManagedLedgerOffloader.this.getOffloadResultAsync();
+ }
+ });
+ }
+
+ private void streamingOffloadLoop(int partId, int dataObjectLength) {
+ log.debug("streaming offload loop {} {}", partId, dataObjectLength);
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ offloadResult.complete(segmentInfo.result());
+ return;
+ }
+ final BufferedOffloadStream payloadStream;
+
+ while (offloadBuffer.isEmpty()) {
+ if (segmentInfo.isClosed()) {
+ offloadResult.complete(segmentInfo.result());
+ } else {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ final Entry peek = offloadBuffer.peek();
+ //initialize payload when there is at least one entry
+ final long blockLedgerId = peek.getLedgerId();
+ final long blockEntryId = peek.getEntryId();
+ payloadStream = new BufferedOffloadStream(streamingBlockSize,
offloadBuffer, segmentInfo,
+ blockLedgerId, blockEntryId, bufferLength);
+ try {
+ streamingIndexBuilder.addLedgerMeta(blockLedgerId,
ml.getRawLedgerMetadata(blockLedgerId).get());
+
streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+ } catch (InterruptedException | ExecutionException e) {
+ offloadResult.completeExceptionally(e);
+ return;
+ }
+ log.debug("begin upload payload");
Review comment:
we can provide more information here
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -115,6 +217,16 @@
CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
Map<String, String>
offloadDriverMetadata);
+ default CompletableFuture<ReadHandle> readOffloaded(long ledgerId,
MLDataFormats.OffloadContext ledgerContext,
+ Map<String, String>
offloadDriverMetadata) {
+ throw new UnsupportedClassVersionError();
Review comment:
you should return a CompletableFuture that reports the
UnsupportedOperationException
the caller of a method that returns a Future does not expect the method to
throw exceptions.
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -115,6 +239,16 @@
CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
Map<String, String>
offloadDriverMetadata);
+ default CompletableFuture<ReadHandle> readOffloaded(long ledgerId,
MLDataFormats.OffloadContext ledgerContext,
+ Map<String, String>
offloadDriverMetadata) {
+ throw new UnsupportedClassVersionError();
+ }
+
+ default CompletableFuture<Void> deleteOffloaded(UUID uid,
+ Map<String, String>
offloadDriverMetadata) {
+ throw new UnsupportedOperationException();
Review comment:
the same here
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]