codelipenghui commented on a change in pull request #9096:
URL: https://github.com/apache/pulsar/pull/9096#discussion_r559985205
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -22,19 +22,92 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
/**
- * Interface for offloading ledgers to long-term storage
+ * Interface for offloading ledgers to long-term storage.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Evolving
public interface LedgerOffloader {
+ interface SegmentInfo {
+ boolean isClosed();
+
+ OffloadResult result();
+ }
+
+ class OffloadResult {
+ public final long beginLedger;
+ public final long beginEntry;
+ public final long endLedger;
+ public final long endEntry;
+
+ public OffloadResult(long beginLedger, long beginEntry, long
endLedger, long endEntry) {
+ this.beginLedger = beginLedger;
+ this.beginEntry = beginEntry;
+ this.endLedger = endLedger;
+ this.endEntry = endEntry;
+ }
+ }
+
+ /**
+ * Used to store driver info, buffer entries, mark progress, etc.
+ * Create one per segment.
+ */
+ interface OffloadHandle {
+ enum OfferEntryResult {
+ SUCCESS,
+ FAIL_BUFFER_FULL,
+ FAIL_SEGMENT_CLOSED,
+ FAIL_NOT_CONSECUTIVE
+ }
+
+ /**
+ * return true when both buffer have enough size and ledger/entry id
is next to the current one.
+ * @param size
+ * @return
+ */
+ boolean canOffer(long size);
+
+ default CompletableFuture<Boolean> asyncCanOffer(long size) {
Review comment:
```suggestion
default CompletableFuture<Boolean> canOfferAsync(long size) {
```
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -22,19 +22,92 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
/**
- * Interface for offloading ledgers to long-term storage
+ * Interface for offloading ledgers to long-term storage.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Evolving
public interface LedgerOffloader {
+ interface SegmentInfo {
+ boolean isClosed();
+
+ OffloadResult result();
+ }
+
+ class OffloadResult {
+ public final long beginLedger;
+ public final long beginEntry;
+ public final long endLedger;
+ public final long endEntry;
+
+ public OffloadResult(long beginLedger, long beginEntry, long
endLedger, long endEntry) {
+ this.beginLedger = beginLedger;
+ this.beginEntry = beginEntry;
+ this.endLedger = endLedger;
+ this.endEntry = endEntry;
+ }
+ }
+
+ /**
+ * Used to store driver info, buffer entries, mark progress, etc.
+ * Create one per segment.
+ */
+ interface OffloadHandle {
+ enum OfferEntryResult {
+ SUCCESS,
+ FAIL_BUFFER_FULL,
+ FAIL_SEGMENT_CLOSED,
+ FAIL_NOT_CONSECUTIVE
+ }
+
+ /**
+ * return true when both buffer have enough size and ledger/entry id
is next to the current one.
+ * @param size
+ * @return
+ */
+ boolean canOffer(long size);
+
+ default CompletableFuture<Boolean> asyncCanOffer(long size) {
+ return CompletableFuture.completedFuture(canOffer(size));
+ }
+
+ PositionImpl lastOffered();
+
+ default CompletableFuture<PositionImpl> asyncLastOffered() {
+ return CompletableFuture.completedFuture(lastOffered());
+ }
+
+ /**
+ * The caller should manually release entry no matter what the offer
result is.
+ */
+ OfferEntryResult offerEntry(Entry entry);
+
+ default CompletableFuture<OfferEntryResult> asyncOfferEntry(EntryImpl
entry) {
+ return CompletableFuture.completedFuture(offerEntry(entry));
Review comment:
Same as above comment.
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/SegmentInfoImpl.java
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+
+import java.util.Map;
+import java.util.UUID;
+import lombok.ToString;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+
+@ToString
+public class SegmentInfoImpl implements LedgerOffloader.SegmentInfo {
Review comment:
```suggestion
public class OffloadSegmentInfoImpl implements LedgerOffloader.SegmentInfo {
```
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexBlockBuilder.java
##########
@@ -71,7 +71,7 @@
/**
* Construct OffloadIndex from an InputStream.
*/
- OffloadIndexBlock fromStream(InputStream is) throws IOException;
+ OffloadIndexBlock indexFromStream(InputStream is) throws IOException;
Review comment:
Any reason to modify this method?
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +265,202 @@ public String getOffloadDriverName() {
return promise;
}
+ BlobStore blobStore;
+ String streamingDataBlockKey;
+ String streamingDataIndexKey;
+ MultipartUpload streamingMpu = null;
+ List<MultipartPart> streamingParts = Lists.newArrayList();
+
+ @Override
+ public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml,
UUID uuid, long beginLedger,
+ long beginEntry,
+ Map<String,
String> driverMetadata) {
+ this.ml = ml;
+ this.segmentInfo = new SegmentInfoImpl(uuid, beginLedger, beginEntry,
config.getDriver(), driverMetadata);
+ log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+ this.offloadResult = new CompletableFuture<>();
+ blobStore = blobStores.get(config.getBlobStoreLocation());
+ streamingIndexBuilder = StreamingOffloadIndexBlockBuilder.create();
+ streamingDataBlockKey = segmentInfo.uuid.toString();
+ streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+ BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+ DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+ Blob blob = blobBuilder.build();
+ streamingMpu = blobStore
+ .initiateMultipartUpload(config.getBucket(),
blob.getMetadata(), new PutOptions());
+
+ scheduler.chooseThread(segmentInfo).execute(() -> {
+ log.info("start offloading segment: {}", segmentInfo);
+ streamingOffloadLoop(1, 0);
+ });
+ scheduler.schedule(this::closeSegment, maxSegmentCloseTime.toMillis(),
TimeUnit.MILLISECONDS);
+
+ return CompletableFuture.completedFuture(new OffloadHandle() {
+ @Override
+ public boolean canOffer(long size) {
+ return BlobStoreManagedLedgerOffloader.this.canOffer(size);
+ }
+
+ @Override
+ public PositionImpl lastOffered() {
+ return BlobStoreManagedLedgerOffloader.this.lastOffered();
+ }
+
+ @Override
+ public OfferEntryResult offerEntry(Entry entry) {
+ return BlobStoreManagedLedgerOffloader.this.offerEntry(entry);
+ }
+
+ @Override
+ public CompletableFuture<OffloadResult> getOffloadResultAsync() {
+ return
BlobStoreManagedLedgerOffloader.this.getOffloadResultAsync();
+ }
+
+ @Override
+ public boolean close() {
+ return BlobStoreManagedLedgerOffloader.this.closeSegment();
+ }
+ });
+ }
+
+ private void streamingOffloadLoop(int partId, int dataObjectLength) {
+ log.debug("streaming offload loop {} {}", partId, dataObjectLength);
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ offloadResult.complete(segmentInfo.result());
+ return;
+ }
+ final BufferedOffloadStream payloadStream;
+
+ while (offloadBuffer.isEmpty()) {
+ if (segmentInfo.isClosed()) {
+ offloadResult.complete(segmentInfo.result());
+ } else {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ final Entry peek = offloadBuffer.peek();
+ //initialize payload when there is at least one entry
+ final long blockLedgerId = peek.getLedgerId();
+ final long blockEntryId = peek.getEntryId();
+ payloadStream = new BufferedOffloadStream(streamingBlockSize,
offloadBuffer, segmentInfo,
+ blockLedgerId, blockEntryId, bufferLength);
+ try {
+ streamingIndexBuilder.addLedgerMeta(blockLedgerId,
ml.getClosedLedgerInfo(blockLedgerId).get());
+
streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+ } catch (InterruptedException | ExecutionException e) {
+ offloadResult.completeExceptionally(e);
+ return;
+ }
+ log.debug("begin upload payload");
+ Payload partPayload = Payloads.newInputStreamPayload(payloadStream);
+
partPayload.getContentMetadata().setContentType("application/octet-stream");
+ streamingParts.add(blobStore.uploadMultipartPart(streamingMpu, partId,
partPayload));
+ streamingIndexBuilder.addBlock(blockLedgerId, blockEntryId, partId,
streamingBlockSize);
+
+ log.debug("UploadMultipartPart. container: {}, blobName: {}, partId:
{}, mpu: {}",
+ config.getBucket(), streamingDataBlockKey, partId,
streamingMpu.id());
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ log.debug("segment closed, buffer is empty");
+ try {
+ blobStore.completeMultipartUpload(streamingMpu,
streamingParts);
+ streamingIndexBuilder.withDataObjectLength(dataObjectLength +
streamingBlockSize);
+ final StreamingOffloadIndexBlock index =
streamingIndexBuilder.buildStreaming();
+ final IndexInputStream indexStream = index.toStream();
+ final BlobBuilder indexBlobBuilder =
blobStore.blobBuilder(streamingDataIndexKey);
+ DataBlockUtils.addVersionInfo(indexBlobBuilder, userMetadata);
+ final InputStreamPayload indexPayLoad =
Payloads.newInputStreamPayload(indexStream);
+
indexPayLoad.getContentMetadata().setContentLength(indexStream.getStreamSize());
+
indexPayLoad.getContentMetadata().setContentType("application/octet-stream");
+ final Blob indexBlob = indexBlobBuilder.payload(indexPayLoad)
+ .contentLength(indexStream.getStreamSize())
+ .build();
+ blobStore.putBlob(config.getBucket(), indexBlob);
+
+ offloadResult.complete(segmentInfo.result());
+ } catch (Exception e) {
+ log.error("streaming offload failed", e);
+ offloadResult.completeExceptionally(e);
+ }
+ log.debug("offload done");
+ } else {
+ log.debug("continue offload loop");
+ scheduler.chooseThread(segmentInfo)
+ .execute(() -> streamingOffloadLoop(partId + 1,
dataObjectLength + streamingBlockSize));
+ }
+ }
+
+ private CompletableFuture<OffloadResult> getOffloadResultAsync() {
+ return this.offloadResult;
+ }
+
+ private synchronized OfferEntryResult offerEntry(Entry entry) {
+
+ if (segmentInfo.isClosed()) {
+ log.debug("Segment already closed {}", segmentInfo);
+ return OfferEntryResult.FAIL_SEGMENT_CLOSED;
+ } else if (maxBufferLength >= bufferLength.get() + entry.getLength()
+ //if single message size larger than full buffer size, then ok
to offer when buffer is empty
+ && !(entry.getLength() > maxBufferLength &&
offloadBuffer.isEmpty())) {
+ return OfferEntryResult.FAIL_BUFFER_FULL;
+ } else {
+ if (!naiveCheckConsecutive(lastOfferedPosition,
+ PositionImpl.get(entry.getLedgerId(),
entry.getEntryId()))) {
+ log.error("position {} and {} are not consecutive",
lastOfferedPosition,
+ entry.getPosition());
+ return OfferEntryResult.FAIL_NOT_CONSECUTIVE;
+ }
+ final EntryImpl entryImpl = EntryImpl
+ .create(entry.getLedgerId(), entry.getEntryId(),
entry.getDataBuffer());
+ offloadBuffer.add(entryImpl);
+ bufferLength.getAndAdd(entryImpl.getLength());
+ segmentLength.getAndAdd(entryImpl.getLength());
+ lastOfferedPosition = entryImpl.getPosition();
+ if (segmentLength.get() >= maxSegmentLength
+ && System.currentTimeMillis() - segmentBeginTimeMillis >=
minSegmentCloseTimeMillis) {
+ closeSegment();
+ }
+ return OfferEntryResult.SUCCESS;
+ }
+ }
+
+ private synchronized boolean closeSegment() {
+ final boolean result = !segmentInfo.isClosed();
+ log.debug("close segment {} {}", lastOfferedPosition.getLedgerId(),
lastOfferedPosition.getEntryId());
+ this.segmentInfo.closeSegment(lastOfferedPosition.getLedgerId(),
lastOfferedPosition.getEntryId());
+ return result;
+ }
+
+ private static boolean naiveCheckConsecutive(PositionImpl
lastOfferedPosition, PositionImpl offeringPosition) {
+ if (offeringPosition.getLedgerId() == lastOfferedPosition.getLedgerId()
+ && offeringPosition.getEntryId() ==
lastOfferedPosition.getEntryId() + 1) {
+ return true;
+ } else if (offeringPosition.getEntryId() == 0) {
+ return true;
+ } else {
+ // lastOfferedPosition not initialized
+ return lastOfferedPosition.equals(PositionImpl.latest);
+ }
+ }
+
+ private PositionImpl lastOffered() {
+ return lastOfferedPosition;
+ }
+
+ private boolean canOffer(long size) {
+ if (segmentInfo.isClosed()) {
+ return false;
Review comment:
If the segment closed return false, the buffer fills up return false,
how to determine at the managed ledger?
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +265,202 @@ public String getOffloadDriverName() {
return promise;
}
+ BlobStore blobStore;
+ String streamingDataBlockKey;
+ String streamingDataIndexKey;
+ MultipartUpload streamingMpu = null;
+ List<MultipartPart> streamingParts = Lists.newArrayList();
+
+ @Override
+ public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml,
UUID uuid, long beginLedger,
+ long beginEntry,
+ Map<String,
String> driverMetadata) {
+ this.ml = ml;
+ this.segmentInfo = new SegmentInfoImpl(uuid, beginLedger, beginEntry,
config.getDriver(), driverMetadata);
+ log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+ this.offloadResult = new CompletableFuture<>();
+ blobStore = blobStores.get(config.getBlobStoreLocation());
+ streamingIndexBuilder = StreamingOffloadIndexBlockBuilder.create();
+ streamingDataBlockKey = segmentInfo.uuid.toString();
+ streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+ BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+ DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+ Blob blob = blobBuilder.build();
+ streamingMpu = blobStore
+ .initiateMultipartUpload(config.getBucket(),
blob.getMetadata(), new PutOptions());
+
+ scheduler.chooseThread(segmentInfo).execute(() -> {
+ log.info("start offloading segment: {}", segmentInfo);
+ streamingOffloadLoop(1, 0);
+ });
+ scheduler.schedule(this::closeSegment, maxSegmentCloseTime.toMillis(),
TimeUnit.MILLISECONDS);
+
+ return CompletableFuture.completedFuture(new OffloadHandle() {
+ @Override
+ public boolean canOffer(long size) {
+ return BlobStoreManagedLedgerOffloader.this.canOffer(size);
+ }
+
+ @Override
+ public PositionImpl lastOffered() {
+ return BlobStoreManagedLedgerOffloader.this.lastOffered();
+ }
+
+ @Override
+ public OfferEntryResult offerEntry(Entry entry) {
+ return BlobStoreManagedLedgerOffloader.this.offerEntry(entry);
+ }
+
+ @Override
+ public CompletableFuture<OffloadResult> getOffloadResultAsync() {
+ return
BlobStoreManagedLedgerOffloader.this.getOffloadResultAsync();
+ }
+
+ @Override
+ public boolean close() {
+ return BlobStoreManagedLedgerOffloader.this.closeSegment();
+ }
+ });
+ }
+
+ private void streamingOffloadLoop(int partId, int dataObjectLength) {
+ log.debug("streaming offload loop {} {}", partId, dataObjectLength);
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ offloadResult.complete(segmentInfo.result());
+ return;
+ }
+ final BufferedOffloadStream payloadStream;
+
+ while (offloadBuffer.isEmpty()) {
+ if (segmentInfo.isClosed()) {
+ offloadResult.complete(segmentInfo.result());
+ } else {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ final Entry peek = offloadBuffer.peek();
+ //initialize payload when there is at least one entry
+ final long blockLedgerId = peek.getLedgerId();
+ final long blockEntryId = peek.getEntryId();
+ payloadStream = new BufferedOffloadStream(streamingBlockSize,
offloadBuffer, segmentInfo,
+ blockLedgerId, blockEntryId, bufferLength);
+ try {
+ streamingIndexBuilder.addLedgerMeta(blockLedgerId,
ml.getClosedLedgerInfo(blockLedgerId).get());
+
streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+ } catch (InterruptedException | ExecutionException e) {
+ offloadResult.completeExceptionally(e);
+ return;
+ }
+ log.debug("begin upload payload");
+ Payload partPayload = Payloads.newInputStreamPayload(payloadStream);
+
partPayload.getContentMetadata().setContentType("application/octet-stream");
+ streamingParts.add(blobStore.uploadMultipartPart(streamingMpu, partId,
partPayload));
+ streamingIndexBuilder.addBlock(blockLedgerId, blockEntryId, partId,
streamingBlockSize);
+
+ log.debug("UploadMultipartPart. container: {}, blobName: {}, partId:
{}, mpu: {}",
+ config.getBucket(), streamingDataBlockKey, partId,
streamingMpu.id());
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ log.debug("segment closed, buffer is empty");
+ try {
+ blobStore.completeMultipartUpload(streamingMpu,
streamingParts);
Review comment:
is the `streamingParts` clear automatically after complete the upload? I
can't find any places to clear the `streamingParts`. BTW, the `streamingParts`
seems can be a local variable
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
##########
@@ -595,4 +594,10 @@ void asyncSetProperties(Map<String, String> properties,
final AsyncCallbacks.Upd
* Get the ManagedLedgerInterceptor for ManagedLedger.
* */
ManagedLedgerInterceptor getManagedLedgerInterceptor();
+
+ /**
+ * Get basic ledger summary after the ledger is closed.
+ * will got exception if corresponding ledger was not closed when the
method called.
+ */
+ CompletableFuture<LedgerInfo> getClosedLedgerInfo(long ledgerId);
Review comment:
Is it better to named getLedgerInfo? and does not distinguish whether it
is closed. The closed state should contains in the LedgerInfo(Now I think we
use the entries=0 to determine the ledger close or not)
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -1685,6 +1686,23 @@ void asyncReadEntries(OpReadEntry opReadEntry) {
return getLedgerHandle(ledgerId).thenApply(rh ->
rh.getLedgerMetadata().toSafeString());
}
+ @Override
+ public CompletableFuture<LedgerInfo> getClosedLedgerInfo(long ledgerId) {
+ CompletableFuture<LedgerInfo> result = new CompletableFuture<>();
+ final LedgerInfo ledgerInfo = ledgers.get(ledgerId);
+ if (ledgerInfo == null) {
+ final ManagedLedgerException exception = new
ManagedLedgerException(
+ Strings.lenientFormat("ledger with id %s not found",
ledgerId));
+ result.completeExceptionally(exception);
Review comment:
It's better to define the result as
CompletableFuture<Optional<LedgerInfo>>? If use exception, it's better to
define a specific exception because we need a way to determine if the exception
is `ledger not found` exception? Use ManagedLedgerException to determine if the
ledger is existed or not seem a bit confuse.
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
##########
@@ -151,6 +151,18 @@ public OffloadInProgressException(String msg) {
}
}
+ public static class OffloadSegmentClosedException extends
ManagedLedgerException {
+ public OffloadSegmentClosedException(String msg) {
+ super(msg);
+ }
+ }
+
+ public static class OffloadNotConsecutiveException extends
ManagedLedgerException {
Review comment:
Do we need this exception? As I mentioned before, there are some markers
in the managed ledger, this will affect the continuity of entry id when offload
data to the tiered storage
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/SegmentInfoImpl.java
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+
+import java.util.Map;
+import java.util.UUID;
+import lombok.ToString;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+
+@ToString
+public class SegmentInfoImpl implements LedgerOffloader.SegmentInfo {
+ public SegmentInfoImpl(UUID uuid, long beginLedger, long beginEntry,
String driverName,
+ Map<String, String> driverMetadata) {
+ this.uuid = uuid;
+ this.beginLedger = beginLedger;
+ this.beginEntry = beginEntry;
+ this.driverName = driverName;
+ this.driverMetadata = driverMetadata;
+ }
+
+
+ public final UUID uuid;
+ public final long beginLedger;
Review comment:
```suggestion
public final long beginLedgerId;
```
Please check all.
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -22,19 +22,92 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
/**
- * Interface for offloading ledgers to long-term storage
+ * Interface for offloading ledgers to long-term storage.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Evolving
public interface LedgerOffloader {
+ interface SegmentInfo {
+ boolean isClosed();
+
+ OffloadResult result();
+ }
+
+ class OffloadResult {
+ public final long beginLedger;
+ public final long beginEntry;
+ public final long endLedger;
+ public final long endEntry;
+
+ public OffloadResult(long beginLedger, long beginEntry, long
endLedger, long endEntry) {
+ this.beginLedger = beginLedger;
+ this.beginEntry = beginEntry;
+ this.endLedger = endLedger;
+ this.endEntry = endEntry;
+ }
+ }
+
+ /**
+ * Used to store driver info, buffer entries, mark progress, etc.
+ * Create one per segment.
+ */
+ interface OffloadHandle {
+ enum OfferEntryResult {
+ SUCCESS,
+ FAIL_BUFFER_FULL,
+ FAIL_SEGMENT_CLOSED,
+ FAIL_NOT_CONSECUTIVE
+ }
+
+ /**
+ * return true when both buffer have enough size and ledger/entry id
is next to the current one.
+ * @param size
+ * @return
+ */
+ boolean canOffer(long size);
+
+ default CompletableFuture<Boolean> asyncCanOffer(long size) {
+ return CompletableFuture.completedFuture(canOffer(size));
+ }
+
+ PositionImpl lastOffered();
+
+ default CompletableFuture<PositionImpl> asyncLastOffered() {
+ return CompletableFuture.completedFuture(lastOffered());
+ }
+
+ /**
+ * The caller should manually release entry no matter what the offer
result is.
+ */
+ OfferEntryResult offerEntry(Entry entry);
+
+ default CompletableFuture<OfferEntryResult> asyncOfferEntry(EntryImpl
entry) {
+ return CompletableFuture.completedFuture(offerEntry(entry));
+ }
+
+ CompletableFuture<OffloadResult> getOffloadResultAsync();
+
+ /**
+ * Manually close current offloading segment
+ * @return true if the segment is not already closed
+ */
+ boolean close();
+
+ default CompletableFuture<Boolean> AsyncClose() {
+ return CompletableFuture.completedFuture(close());
Review comment:
Same as the above comment
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -22,19 +22,92 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
/**
- * Interface for offloading ledgers to long-term storage
+ * Interface for offloading ledgers to long-term storage.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Evolving
public interface LedgerOffloader {
+ interface SegmentInfo {
Review comment:
Is it used for managed ledger or any other component? If it only for
offload internal, we don't need to expose it.
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -22,19 +22,92 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
/**
- * Interface for offloading ledgers to long-term storage
+ * Interface for offloading ledgers to long-term storage.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Evolving
public interface LedgerOffloader {
+ interface SegmentInfo {
+ boolean isClosed();
+
+ OffloadResult result();
+ }
+
+ class OffloadResult {
+ public final long beginLedger;
+ public final long beginEntry;
+ public final long endLedger;
+ public final long endEntry;
+
+ public OffloadResult(long beginLedger, long beginEntry, long
endLedger, long endEntry) {
+ this.beginLedger = beginLedger;
+ this.beginEntry = beginEntry;
+ this.endLedger = endLedger;
+ this.endEntry = endEntry;
+ }
+ }
+
+ /**
+ * Used to store driver info, buffer entries, mark progress, etc.
+ * Create one per segment.
+ */
+ interface OffloadHandle {
+ enum OfferEntryResult {
+ SUCCESS,
+ FAIL_BUFFER_FULL,
+ FAIL_SEGMENT_CLOSED,
+ FAIL_NOT_CONSECUTIVE
+ }
+
+ /**
+ * return true when both buffer have enough size and ledger/entry id
is next to the current one.
+ * @param size
+ * @return
+ */
+ boolean canOffer(long size);
+
+ default CompletableFuture<Boolean> asyncCanOffer(long size) {
+ return CompletableFuture.completedFuture(canOffer(size));
+ }
+
+ PositionImpl lastOffered();
+
+ default CompletableFuture<PositionImpl> asyncLastOffered() {
+ return CompletableFuture.completedFuture(lastOffered());
Review comment:
same as the above comment, you can remove the default implementation
from the interface.
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +265,202 @@ public String getOffloadDriverName() {
return promise;
}
+ BlobStore blobStore;
+ String streamingDataBlockKey;
+ String streamingDataIndexKey;
+ MultipartUpload streamingMpu = null;
+ List<MultipartPart> streamingParts = Lists.newArrayList();
+
+ @Override
+ public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml,
UUID uuid, long beginLedger,
+ long beginEntry,
+ Map<String,
String> driverMetadata) {
+ this.ml = ml;
+ this.segmentInfo = new SegmentInfoImpl(uuid, beginLedger, beginEntry,
config.getDriver(), driverMetadata);
+ log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+ this.offloadResult = new CompletableFuture<>();
+ blobStore = blobStores.get(config.getBlobStoreLocation());
+ streamingIndexBuilder = StreamingOffloadIndexBlockBuilder.create();
+ streamingDataBlockKey = segmentInfo.uuid.toString();
+ streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+ BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+ DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+ Blob blob = blobBuilder.build();
+ streamingMpu = blobStore
+ .initiateMultipartUpload(config.getBucket(),
blob.getMetadata(), new PutOptions());
+
+ scheduler.chooseThread(segmentInfo).execute(() -> {
+ log.info("start offloading segment: {}", segmentInfo);
+ streamingOffloadLoop(1, 0);
+ });
+ scheduler.schedule(this::closeSegment, maxSegmentCloseTime.toMillis(),
TimeUnit.MILLISECONDS);
+
+ return CompletableFuture.completedFuture(new OffloadHandle() {
+ @Override
+ public boolean canOffer(long size) {
+ return BlobStoreManagedLedgerOffloader.this.canOffer(size);
+ }
+
+ @Override
+ public PositionImpl lastOffered() {
+ return BlobStoreManagedLedgerOffloader.this.lastOffered();
+ }
+
+ @Override
+ public OfferEntryResult offerEntry(Entry entry) {
+ return BlobStoreManagedLedgerOffloader.this.offerEntry(entry);
+ }
+
+ @Override
+ public CompletableFuture<OffloadResult> getOffloadResultAsync() {
+ return
BlobStoreManagedLedgerOffloader.this.getOffloadResultAsync();
+ }
+
+ @Override
+ public boolean close() {
+ return BlobStoreManagedLedgerOffloader.this.closeSegment();
+ }
+ });
+ }
+
+ private void streamingOffloadLoop(int partId, int dataObjectLength) {
+ log.debug("streaming offload loop {} {}", partId, dataObjectLength);
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ offloadResult.complete(segmentInfo.result());
+ return;
+ }
+ final BufferedOffloadStream payloadStream;
+
+ while (offloadBuffer.isEmpty()) {
+ if (segmentInfo.isClosed()) {
+ offloadResult.complete(segmentInfo.result());
+ } else {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ final Entry peek = offloadBuffer.peek();
+ //initialize payload when there is at least one entry
+ final long blockLedgerId = peek.getLedgerId();
+ final long blockEntryId = peek.getEntryId();
+ payloadStream = new BufferedOffloadStream(streamingBlockSize,
offloadBuffer, segmentInfo,
+ blockLedgerId, blockEntryId, bufferLength);
+ try {
+ streamingIndexBuilder.addLedgerMeta(blockLedgerId,
ml.getClosedLedgerInfo(blockLedgerId).get());
+
streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+ } catch (InterruptedException | ExecutionException e) {
+ offloadResult.completeExceptionally(e);
+ return;
+ }
+ log.debug("begin upload payload");
+ Payload partPayload = Payloads.newInputStreamPayload(payloadStream);
+
partPayload.getContentMetadata().setContentType("application/octet-stream");
+ streamingParts.add(blobStore.uploadMultipartPart(streamingMpu, partId,
partPayload));
+ streamingIndexBuilder.addBlock(blockLedgerId, blockEntryId, partId,
streamingBlockSize);
+
+ log.debug("UploadMultipartPart. container: {}, blobName: {}, partId:
{}, mpu: {}",
+ config.getBucket(), streamingDataBlockKey, partId,
streamingMpu.id());
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ log.debug("segment closed, buffer is empty");
+ try {
+ blobStore.completeMultipartUpload(streamingMpu,
streamingParts);
+ streamingIndexBuilder.withDataObjectLength(dataObjectLength +
streamingBlockSize);
+ final StreamingOffloadIndexBlock index =
streamingIndexBuilder.buildStreaming();
+ final IndexInputStream indexStream = index.toStream();
+ final BlobBuilder indexBlobBuilder =
blobStore.blobBuilder(streamingDataIndexKey);
+ DataBlockUtils.addVersionInfo(indexBlobBuilder, userMetadata);
+ final InputStreamPayload indexPayLoad =
Payloads.newInputStreamPayload(indexStream);
+
indexPayLoad.getContentMetadata().setContentLength(indexStream.getStreamSize());
+
indexPayLoad.getContentMetadata().setContentType("application/octet-stream");
+ final Blob indexBlob = indexBlobBuilder.payload(indexPayLoad)
+ .contentLength(indexStream.getStreamSize())
+ .build();
+ blobStore.putBlob(config.getBucket(), indexBlob);
+
+ offloadResult.complete(segmentInfo.result());
+ } catch (Exception e) {
+ log.error("streaming offload failed", e);
+ offloadResult.completeExceptionally(e);
+ }
+ log.debug("offload done");
+ } else {
+ log.debug("continue offload loop");
+ scheduler.chooseThread(segmentInfo)
+ .execute(() -> streamingOffloadLoop(partId + 1,
dataObjectLength + streamingBlockSize));
+ }
+ }
+
+ private CompletableFuture<OffloadResult> getOffloadResultAsync() {
+ return this.offloadResult;
+ }
+
+ private synchronized OfferEntryResult offerEntry(Entry entry) {
+
+ if (segmentInfo.isClosed()) {
+ log.debug("Segment already closed {}", segmentInfo);
+ return OfferEntryResult.FAIL_SEGMENT_CLOSED;
+ } else if (maxBufferLength >= bufferLength.get() + entry.getLength()
+ //if single message size larger than full buffer size, then ok
to offer when buffer is empty
+ && !(entry.getLength() > maxBufferLength &&
offloadBuffer.isEmpty())) {
+ return OfferEntryResult.FAIL_BUFFER_FULL;
+ } else {
+ if (!naiveCheckConsecutive(lastOfferedPosition,
+ PositionImpl.get(entry.getLedgerId(),
entry.getEntryId()))) {
+ log.error("position {} and {} are not consecutive",
lastOfferedPosition,
+ entry.getPosition());
+ return OfferEntryResult.FAIL_NOT_CONSECUTIVE;
+ }
+ final EntryImpl entryImpl = EntryImpl
+ .create(entry.getLedgerId(), entry.getEntryId(),
entry.getDataBuffer());
+ offloadBuffer.add(entryImpl);
+ bufferLength.getAndAdd(entryImpl.getLength());
+ segmentLength.getAndAdd(entryImpl.getLength());
+ lastOfferedPosition = entryImpl.getPosition();
+ if (segmentLength.get() >= maxSegmentLength
+ && System.currentTimeMillis() - segmentBeginTimeMillis >=
minSegmentCloseTimeMillis) {
+ closeSegment();
+ }
+ return OfferEntryResult.SUCCESS;
+ }
+ }
+
+ private synchronized boolean closeSegment() {
+ final boolean result = !segmentInfo.isClosed();
+ log.debug("close segment {} {}", lastOfferedPosition.getLedgerId(),
lastOfferedPosition.getEntryId());
+ this.segmentInfo.closeSegment(lastOfferedPosition.getLedgerId(),
lastOfferedPosition.getEntryId());
+ return result;
+ }
+
+ private static boolean naiveCheckConsecutive(PositionImpl
lastOfferedPosition, PositionImpl offeringPosition) {
+ if (offeringPosition.getLedgerId() == lastOfferedPosition.getLedgerId()
+ && offeringPosition.getEntryId() ==
lastOfferedPosition.getEntryId() + 1) {
+ return true;
+ } else if (offeringPosition.getEntryId() == 0) {
Review comment:
It's not correct here, because might skip all messages of a ledger. As I
mentioned before, it's hard to determine the `Consecutive` at the offloader.
And the marker also will affect this behavior in the future.
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/StreamingDataBlockHeaderImpl.java
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import com.google.common.io.CountingInputStream;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+
+/**
+ * The data block header in code storage for each data block.
+ */
+public class StreamingDataBlockHeaderImpl implements DataBlockHeader {
Review comment:
`StreamingDataBlockHeaderImpl` -> `DataBlockHeaderImplV2`? @Renkai
@sijie The header format can be used by different offload implementations, I
think coupling with streaming offload is not a good choice.
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +265,202 @@ public String getOffloadDriverName() {
return promise;
}
+ BlobStore blobStore;
+ String streamingDataBlockKey;
+ String streamingDataIndexKey;
+ MultipartUpload streamingMpu = null;
+ List<MultipartPart> streamingParts = Lists.newArrayList();
+
+ @Override
+ public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml,
UUID uuid, long beginLedger,
+ long beginEntry,
+ Map<String,
String> driverMetadata) {
+ this.ml = ml;
+ this.segmentInfo = new SegmentInfoImpl(uuid, beginLedger, beginEntry,
config.getDriver(), driverMetadata);
+ log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+ this.offloadResult = new CompletableFuture<>();
+ blobStore = blobStores.get(config.getBlobStoreLocation());
+ streamingIndexBuilder = StreamingOffloadIndexBlockBuilder.create();
+ streamingDataBlockKey = segmentInfo.uuid.toString();
+ streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+ BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+ DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+ Blob blob = blobBuilder.build();
+ streamingMpu = blobStore
+ .initiateMultipartUpload(config.getBucket(),
blob.getMetadata(), new PutOptions());
+
+ scheduler.chooseThread(segmentInfo).execute(() -> {
+ log.info("start offloading segment: {}", segmentInfo);
+ streamingOffloadLoop(1, 0);
+ });
+ scheduler.schedule(this::closeSegment, maxSegmentCloseTime.toMillis(),
TimeUnit.MILLISECONDS);
+
+ return CompletableFuture.completedFuture(new OffloadHandle() {
+ @Override
+ public boolean canOffer(long size) {
+ return BlobStoreManagedLedgerOffloader.this.canOffer(size);
+ }
+
+ @Override
+ public PositionImpl lastOffered() {
+ return BlobStoreManagedLedgerOffloader.this.lastOffered();
+ }
+
+ @Override
+ public OfferEntryResult offerEntry(Entry entry) {
+ return BlobStoreManagedLedgerOffloader.this.offerEntry(entry);
+ }
+
+ @Override
+ public CompletableFuture<OffloadResult> getOffloadResultAsync() {
+ return
BlobStoreManagedLedgerOffloader.this.getOffloadResultAsync();
+ }
+
+ @Override
+ public boolean close() {
+ return BlobStoreManagedLedgerOffloader.this.closeSegment();
+ }
+ });
+ }
+
+ private void streamingOffloadLoop(int partId, int dataObjectLength) {
+ log.debug("streaming offload loop {} {}", partId, dataObjectLength);
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ offloadResult.complete(segmentInfo.result());
+ return;
+ }
+ final BufferedOffloadStream payloadStream;
+
+ while (offloadBuffer.isEmpty()) {
+ if (segmentInfo.isClosed()) {
+ offloadResult.complete(segmentInfo.result());
+ } else {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ final Entry peek = offloadBuffer.peek();
+ //initialize payload when there is at least one entry
+ final long blockLedgerId = peek.getLedgerId();
+ final long blockEntryId = peek.getEntryId();
+ payloadStream = new BufferedOffloadStream(streamingBlockSize,
offloadBuffer, segmentInfo,
+ blockLedgerId, blockEntryId, bufferLength);
+ try {
+ streamingIndexBuilder.addLedgerMeta(blockLedgerId,
ml.getClosedLedgerInfo(blockLedgerId).get());
+
streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+ } catch (InterruptedException | ExecutionException e) {
+ offloadResult.completeExceptionally(e);
+ return;
+ }
+ log.debug("begin upload payload");
+ Payload partPayload = Payloads.newInputStreamPayload(payloadStream);
+
partPayload.getContentMetadata().setContentType("application/octet-stream");
+ streamingParts.add(blobStore.uploadMultipartPart(streamingMpu, partId,
partPayload));
+ streamingIndexBuilder.addBlock(blockLedgerId, blockEntryId, partId,
streamingBlockSize);
+
+ log.debug("UploadMultipartPart. container: {}, blobName: {}, partId:
{}, mpu: {}",
+ config.getBucket(), streamingDataBlockKey, partId,
streamingMpu.id());
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ log.debug("segment closed, buffer is empty");
+ try {
+ blobStore.completeMultipartUpload(streamingMpu,
streamingParts);
+ streamingIndexBuilder.withDataObjectLength(dataObjectLength +
streamingBlockSize);
+ final StreamingOffloadIndexBlock index =
streamingIndexBuilder.buildStreaming();
+ final IndexInputStream indexStream = index.toStream();
+ final BlobBuilder indexBlobBuilder =
blobStore.blobBuilder(streamingDataIndexKey);
+ DataBlockUtils.addVersionInfo(indexBlobBuilder, userMetadata);
+ final InputStreamPayload indexPayLoad =
Payloads.newInputStreamPayload(indexStream);
Review comment:
Is it need to close?
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +265,202 @@ public String getOffloadDriverName() {
return promise;
}
+ BlobStore blobStore;
+ String streamingDataBlockKey;
+ String streamingDataIndexKey;
+ MultipartUpload streamingMpu = null;
+ List<MultipartPart> streamingParts = Lists.newArrayList();
+
+ @Override
+ public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml,
UUID uuid, long beginLedger,
+ long beginEntry,
+ Map<String,
String> driverMetadata) {
+ this.ml = ml;
+ this.segmentInfo = new SegmentInfoImpl(uuid, beginLedger, beginEntry,
config.getDriver(), driverMetadata);
+ log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+ this.offloadResult = new CompletableFuture<>();
+ blobStore = blobStores.get(config.getBlobStoreLocation());
+ streamingIndexBuilder = StreamingOffloadIndexBlockBuilder.create();
+ streamingDataBlockKey = segmentInfo.uuid.toString();
+ streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+ BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+ DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+ Blob blob = blobBuilder.build();
+ streamingMpu = blobStore
+ .initiateMultipartUpload(config.getBucket(),
blob.getMetadata(), new PutOptions());
+
+ scheduler.chooseThread(segmentInfo).execute(() -> {
+ log.info("start offloading segment: {}", segmentInfo);
+ streamingOffloadLoop(1, 0);
+ });
+ scheduler.schedule(this::closeSegment, maxSegmentCloseTime.toMillis(),
TimeUnit.MILLISECONDS);
+
+ return CompletableFuture.completedFuture(new OffloadHandle() {
+ @Override
+ public boolean canOffer(long size) {
+ return BlobStoreManagedLedgerOffloader.this.canOffer(size);
+ }
+
+ @Override
+ public PositionImpl lastOffered() {
+ return BlobStoreManagedLedgerOffloader.this.lastOffered();
+ }
+
+ @Override
+ public OfferEntryResult offerEntry(Entry entry) {
+ return BlobStoreManagedLedgerOffloader.this.offerEntry(entry);
+ }
+
+ @Override
+ public CompletableFuture<OffloadResult> getOffloadResultAsync() {
+ return
BlobStoreManagedLedgerOffloader.this.getOffloadResultAsync();
+ }
+
+ @Override
+ public boolean close() {
+ return BlobStoreManagedLedgerOffloader.this.closeSegment();
+ }
+ });
+ }
+
+ private void streamingOffloadLoop(int partId, int dataObjectLength) {
+ log.debug("streaming offload loop {} {}", partId, dataObjectLength);
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ offloadResult.complete(segmentInfo.result());
+ return;
+ }
+ final BufferedOffloadStream payloadStream;
+
+ while (offloadBuffer.isEmpty()) {
+ if (segmentInfo.isClosed()) {
+ offloadResult.complete(segmentInfo.result());
+ } else {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ final Entry peek = offloadBuffer.peek();
+ //initialize payload when there is at least one entry
+ final long blockLedgerId = peek.getLedgerId();
+ final long blockEntryId = peek.getEntryId();
+ payloadStream = new BufferedOffloadStream(streamingBlockSize,
offloadBuffer, segmentInfo,
+ blockLedgerId, blockEntryId, bufferLength);
+ try {
+ streamingIndexBuilder.addLedgerMeta(blockLedgerId,
ml.getClosedLedgerInfo(blockLedgerId).get());
+
streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+ } catch (InterruptedException | ExecutionException e) {
+ offloadResult.completeExceptionally(e);
+ return;
+ }
+ log.debug("begin upload payload");
+ Payload partPayload = Payloads.newInputStreamPayload(payloadStream);
+
partPayload.getContentMetadata().setContentType("application/octet-stream");
+ streamingParts.add(blobStore.uploadMultipartPart(streamingMpu, partId,
partPayload));
+ streamingIndexBuilder.addBlock(blockLedgerId, blockEntryId, partId,
streamingBlockSize);
+
+ log.debug("UploadMultipartPart. container: {}, blobName: {}, partId:
{}, mpu: {}",
+ config.getBucket(), streamingDataBlockKey, partId,
streamingMpu.id());
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ log.debug("segment closed, buffer is empty");
Review comment:
we can provide more information here
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BufferedOffloadStream.java
##########
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.impl.SegmentInfoImpl;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+
+@Slf4j
+public class BufferedOffloadStream extends InputStream {
+ static final int[] BLOCK_END_PADDING =
BlockAwareSegmentInputStreamImpl.BLOCK_END_PADDING;
+ private final SegmentInfoImpl segmentInfo;
+
+ private final long ledgerId;
+ private final long beginEntryId;
+ private AtomicLong bufferLength;
+ static final int ENTRY_HEADER_SIZE = 4 /* entry size */ + 8 /* entry id */;
+ private final long blockSize;
+ private final ConcurrentLinkedQueue<Entry> entryBuffer;
+ private final InputStream blockHead;
+ int offset = 0;
+ static int NOT_INITIALIZED = -1;
Review comment:
final?
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +265,202 @@ public String getOffloadDriverName() {
return promise;
}
+ BlobStore blobStore;
+ String streamingDataBlockKey;
+ String streamingDataIndexKey;
+ MultipartUpload streamingMpu = null;
+ List<MultipartPart> streamingParts = Lists.newArrayList();
+
+ @Override
+ public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml,
UUID uuid, long beginLedger,
+ long beginEntry,
+ Map<String,
String> driverMetadata) {
+ this.ml = ml;
+ this.segmentInfo = new SegmentInfoImpl(uuid, beginLedger, beginEntry,
config.getDriver(), driverMetadata);
+ log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+ this.offloadResult = new CompletableFuture<>();
+ blobStore = blobStores.get(config.getBlobStoreLocation());
+ streamingIndexBuilder = StreamingOffloadIndexBlockBuilder.create();
+ streamingDataBlockKey = segmentInfo.uuid.toString();
+ streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+ BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+ DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+ Blob blob = blobBuilder.build();
+ streamingMpu = blobStore
+ .initiateMultipartUpload(config.getBucket(),
blob.getMetadata(), new PutOptions());
+
+ scheduler.chooseThread(segmentInfo).execute(() -> {
+ log.info("start offloading segment: {}", segmentInfo);
+ streamingOffloadLoop(1, 0);
+ });
+ scheduler.schedule(this::closeSegment, maxSegmentCloseTime.toMillis(),
TimeUnit.MILLISECONDS);
+
+ return CompletableFuture.completedFuture(new OffloadHandle() {
+ @Override
+ public boolean canOffer(long size) {
+ return BlobStoreManagedLedgerOffloader.this.canOffer(size);
+ }
+
+ @Override
+ public PositionImpl lastOffered() {
+ return BlobStoreManagedLedgerOffloader.this.lastOffered();
+ }
+
+ @Override
+ public OfferEntryResult offerEntry(Entry entry) {
+ return BlobStoreManagedLedgerOffloader.this.offerEntry(entry);
+ }
+
+ @Override
+ public CompletableFuture<OffloadResult> getOffloadResultAsync() {
+ return
BlobStoreManagedLedgerOffloader.this.getOffloadResultAsync();
+ }
+
+ @Override
+ public boolean close() {
+ return BlobStoreManagedLedgerOffloader.this.closeSegment();
+ }
+ });
+ }
+
+ private void streamingOffloadLoop(int partId, int dataObjectLength) {
+ log.debug("streaming offload loop {} {}", partId, dataObjectLength);
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ offloadResult.complete(segmentInfo.result());
+ return;
+ }
+ final BufferedOffloadStream payloadStream;
+
+ while (offloadBuffer.isEmpty()) {
+ if (segmentInfo.isClosed()) {
+ offloadResult.complete(segmentInfo.result());
+ } else {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ final Entry peek = offloadBuffer.peek();
+ //initialize payload when there is at least one entry
+ final long blockLedgerId = peek.getLedgerId();
+ final long blockEntryId = peek.getEntryId();
+ payloadStream = new BufferedOffloadStream(streamingBlockSize,
offloadBuffer, segmentInfo,
+ blockLedgerId, blockEntryId, bufferLength);
+ try {
+ streamingIndexBuilder.addLedgerMeta(blockLedgerId,
ml.getClosedLedgerInfo(blockLedgerId).get());
+
streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+ } catch (InterruptedException | ExecutionException e) {
+ offloadResult.completeExceptionally(e);
+ return;
+ }
+ log.debug("begin upload payload");
+ Payload partPayload = Payloads.newInputStreamPayload(payloadStream);
+
partPayload.getContentMetadata().setContentType("application/octet-stream");
+ streamingParts.add(blobStore.uploadMultipartPart(streamingMpu, partId,
partPayload));
+ streamingIndexBuilder.addBlock(blockLedgerId, blockEntryId, partId,
streamingBlockSize);
+
+ log.debug("UploadMultipartPart. container: {}, blobName: {}, partId:
{}, mpu: {}",
+ config.getBucket(), streamingDataBlockKey, partId,
streamingMpu.id());
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ log.debug("segment closed, buffer is empty");
+ try {
+ blobStore.completeMultipartUpload(streamingMpu,
streamingParts);
+ streamingIndexBuilder.withDataObjectLength(dataObjectLength +
streamingBlockSize);
+ final StreamingOffloadIndexBlock index =
streamingIndexBuilder.buildStreaming();
+ final IndexInputStream indexStream = index.toStream();
+ final BlobBuilder indexBlobBuilder =
blobStore.blobBuilder(streamingDataIndexKey);
+ DataBlockUtils.addVersionInfo(indexBlobBuilder, userMetadata);
+ final InputStreamPayload indexPayLoad =
Payloads.newInputStreamPayload(indexStream);
+
indexPayLoad.getContentMetadata().setContentLength(indexStream.getStreamSize());
+
indexPayLoad.getContentMetadata().setContentType("application/octet-stream");
+ final Blob indexBlob = indexBlobBuilder.payload(indexPayLoad)
+ .contentLength(indexStream.getStreamSize())
+ .build();
+ blobStore.putBlob(config.getBucket(), indexBlob);
+
+ offloadResult.complete(segmentInfo.result());
+ } catch (Exception e) {
+ log.error("streaming offload failed", e);
+ offloadResult.completeExceptionally(e);
+ }
+ log.debug("offload done");
+ } else {
+ log.debug("continue offload loop");
Review comment:
Same as above comment
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BufferedOffloadStream.java
##########
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.impl.SegmentInfoImpl;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+
+@Slf4j
+public class BufferedOffloadStream extends InputStream {
+ static final int[] BLOCK_END_PADDING =
BlockAwareSegmentInputStreamImpl.BLOCK_END_PADDING;
+ private final SegmentInfoImpl segmentInfo;
+
+ private final long ledgerId;
+ private final long beginEntryId;
+ private AtomicLong bufferLength;
+ static final int ENTRY_HEADER_SIZE = 4 /* entry size */ + 8 /* entry id */;
+ private final long blockSize;
+ private final ConcurrentLinkedQueue<Entry> entryBuffer;
+ private final InputStream blockHead;
+ int offset = 0;
+ static int NOT_INITIALIZED = -1;
+ int validDataOffset = NOT_INITIALIZED;
+ CompositeByteBuf currentEntry;
+
+ public long getLedgerId() {
+ return ledgerId;
+ }
+
+ public long getBeginEntryId() {
+ return beginEntryId;
+ }
+
+ public long getBlockSize() {
+ return blockSize;
+ }
+
+
+ public BufferedOffloadStream(int blockSize,
+ ConcurrentLinkedQueue<Entry> entryBuffer,
+ SegmentInfoImpl segmentInfo,
+ long ledgerId,
+ long beginEntryId,
+ AtomicLong bufferLength) {
+ this.ledgerId = ledgerId;
+ this.beginEntryId = beginEntryId;
+ this.blockSize = blockSize;
+ this.segmentInfo = segmentInfo;
+ this.entryBuffer = entryBuffer;
+ this.bufferLength = bufferLength;
+ this.blockHead = StreamingDataBlockHeaderImpl.of(blockSize, ledgerId,
beginEntryId)
+ .toStream();
+ }
+
+
+ @Override
+ public int read() throws IOException {
+ if (blockHead.available() > 0) {
+ offset++;
+ return blockHead.read();
+ }
+ //if current exists, use current first
+ if (currentEntry != null) {
+ if (currentEntry.readableBytes() > 0) {
+ offset += 1;
+ return currentEntry.readUnsignedByte();
+ } else {
+ currentEntry.release();
+ currentEntry = null;
+ }
+ }
+
+ if (blockSize <= offset) {
+ return -1;
+ } else if (validDataOffset != NOT_INITIALIZED) {
+ return BLOCK_END_PADDING[(offset++ - validDataOffset) %
BLOCK_END_PADDING.length];
+ }
+
+ Entry headEntry;
+
+ while ((headEntry = entryBuffer.peek()) == null) {
+ if (segmentInfo.isClosed()) {
+ if (validDataOffset == NOT_INITIALIZED) {
+ validDataOffset = offset;
+ }
+ return read();
+ } else {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ log.error("sleep failed", e);
+ }
+ }
+ }
+
+ //create new block when a ledger end
+ if (headEntry.getLedgerId() != this.ledgerId) {
+ if (validDataOffset == NOT_INITIALIZED) {
+ validDataOffset = offset;
+ }
+ return read();
+ }
+
+ if (blockSize >= offset
+ + ENTRY_HEADER_SIZE
+ + headEntry.getLength()) {
+ entryBuffer.poll();
Review comment:
Should peek first? If the next entry exceeds the max block size, what
the behavior?
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +265,202 @@ public String getOffloadDriverName() {
return promise;
}
+ BlobStore blobStore;
+ String streamingDataBlockKey;
+ String streamingDataIndexKey;
+ MultipartUpload streamingMpu = null;
+ List<MultipartPart> streamingParts = Lists.newArrayList();
+
+ @Override
+ public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml,
UUID uuid, long beginLedger,
+ long beginEntry,
+ Map<String,
String> driverMetadata) {
+ this.ml = ml;
+ this.segmentInfo = new SegmentInfoImpl(uuid, beginLedger, beginEntry,
config.getDriver(), driverMetadata);
+ log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+ this.offloadResult = new CompletableFuture<>();
+ blobStore = blobStores.get(config.getBlobStoreLocation());
+ streamingIndexBuilder = StreamingOffloadIndexBlockBuilder.create();
+ streamingDataBlockKey = segmentInfo.uuid.toString();
+ streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+ BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+ DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+ Blob blob = blobBuilder.build();
+ streamingMpu = blobStore
+ .initiateMultipartUpload(config.getBucket(),
blob.getMetadata(), new PutOptions());
+
+ scheduler.chooseThread(segmentInfo).execute(() -> {
+ log.info("start offloading segment: {}", segmentInfo);
+ streamingOffloadLoop(1, 0);
+ });
+ scheduler.schedule(this::closeSegment, maxSegmentCloseTime.toMillis(),
TimeUnit.MILLISECONDS);
+
+ return CompletableFuture.completedFuture(new OffloadHandle() {
+ @Override
+ public boolean canOffer(long size) {
+ return BlobStoreManagedLedgerOffloader.this.canOffer(size);
+ }
+
+ @Override
+ public PositionImpl lastOffered() {
+ return BlobStoreManagedLedgerOffloader.this.lastOffered();
+ }
+
+ @Override
+ public OfferEntryResult offerEntry(Entry entry) {
+ return BlobStoreManagedLedgerOffloader.this.offerEntry(entry);
+ }
+
+ @Override
+ public CompletableFuture<OffloadResult> getOffloadResultAsync() {
+ return
BlobStoreManagedLedgerOffloader.this.getOffloadResultAsync();
+ }
+
+ @Override
+ public boolean close() {
+ return BlobStoreManagedLedgerOffloader.this.closeSegment();
+ }
+ });
+ }
+
+ private void streamingOffloadLoop(int partId, int dataObjectLength) {
+ log.debug("streaming offload loop {} {}", partId, dataObjectLength);
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ offloadResult.complete(segmentInfo.result());
+ return;
+ }
+ final BufferedOffloadStream payloadStream;
+
+ while (offloadBuffer.isEmpty()) {
+ if (segmentInfo.isClosed()) {
+ offloadResult.complete(segmentInfo.result());
+ } else {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ final Entry peek = offloadBuffer.peek();
+ //initialize payload when there is at least one entry
+ final long blockLedgerId = peek.getLedgerId();
+ final long blockEntryId = peek.getEntryId();
+ payloadStream = new BufferedOffloadStream(streamingBlockSize,
offloadBuffer, segmentInfo,
+ blockLedgerId, blockEntryId, bufferLength);
+ try {
+ streamingIndexBuilder.addLedgerMeta(blockLedgerId,
ml.getClosedLedgerInfo(blockLedgerId).get());
+
streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+ } catch (InterruptedException | ExecutionException e) {
+ offloadResult.completeExceptionally(e);
+ return;
+ }
+ log.debug("begin upload payload");
+ Payload partPayload = Payloads.newInputStreamPayload(payloadStream);
+
partPayload.getContentMetadata().setContentType("application/octet-stream");
+ streamingParts.add(blobStore.uploadMultipartPart(streamingMpu, partId,
partPayload));
+ streamingIndexBuilder.addBlock(blockLedgerId, blockEntryId, partId,
streamingBlockSize);
+
+ log.debug("UploadMultipartPart. container: {}, blobName: {}, partId:
{}, mpu: {}",
+ config.getBucket(), streamingDataBlockKey, partId,
streamingMpu.id());
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ log.debug("segment closed, buffer is empty");
+ try {
+ blobStore.completeMultipartUpload(streamingMpu,
streamingParts);
+ streamingIndexBuilder.withDataObjectLength(dataObjectLength +
streamingBlockSize);
+ final StreamingOffloadIndexBlock index =
streamingIndexBuilder.buildStreaming();
+ final IndexInputStream indexStream = index.toStream();
+ final BlobBuilder indexBlobBuilder =
blobStore.blobBuilder(streamingDataIndexKey);
+ DataBlockUtils.addVersionInfo(indexBlobBuilder, userMetadata);
+ final InputStreamPayload indexPayLoad =
Payloads.newInputStreamPayload(indexStream);
+
indexPayLoad.getContentMetadata().setContentLength(indexStream.getStreamSize());
+
indexPayLoad.getContentMetadata().setContentType("application/octet-stream");
+ final Blob indexBlob = indexBlobBuilder.payload(indexPayLoad)
+ .contentLength(indexStream.getStreamSize())
+ .build();
+ blobStore.putBlob(config.getBucket(), indexBlob);
+
+ offloadResult.complete(segmentInfo.result());
+ } catch (Exception e) {
+ log.error("streaming offload failed", e);
+ offloadResult.completeExceptionally(e);
+ }
+ log.debug("offload done");
Review comment:
Please provide more information, if you have many topics, the debug log
can't help find any thing here.
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/StreamingBlobStoreBackedReadHandleImpl.java
##########
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import lombok.val;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
+import org.apache.bookkeeper.mledger.offload.jcloud.StreamingOffloadIndexBlock;
+import
org.apache.bookkeeper.mledger.offload.jcloud.StreamingOffloadIndexBlockBuilder;
+import
org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.domain.Blob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamingBlobStoreBackedReadHandleImpl implements ReadHandle {
+ private static final Logger log =
LoggerFactory.getLogger(StreamingBlobStoreBackedReadHandleImpl.class);
+
+ private final long ledgerId;
+ private final List<StreamingOffloadIndexBlock> indices;
+ private final List<BackedInputStream> inputStreams;
+ private final List<DataInputStream> dataStreams;
+ private final ExecutorService executor;
+
+ static class GroupedReader {
+ long ledgerId;
+ long firstEntry;
+ long lastEntry;
+ StreamingOffloadIndexBlock index;
+ BackedInputStream inputStream;
+ DataInputStream dataStream;
+
+ public GroupedReader(long ledgerId, long firstEntry, long lastEntry,
+ StreamingOffloadIndexBlock index,
+ BackedInputStream inputStream, DataInputStream
dataStream) {
+ this.ledgerId = ledgerId;
+ this.firstEntry = firstEntry;
+ this.lastEntry = lastEntry;
+ this.index = index;
+ this.inputStream = inputStream;
+ this.dataStream = dataStream;
+ }
+ }
+
+ private StreamingBlobStoreBackedReadHandleImpl(long ledgerId,
List<StreamingOffloadIndexBlock> indices,
+ List<BackedInputStream>
inputStreams,
+ ExecutorService executor) {
+ this.ledgerId = ledgerId;
+ this.indices = indices;
+ this.inputStreams = inputStreams;
+ this.dataStreams = new LinkedList<>();
+ for (BackedInputStream inputStream : inputStreams) {
+ dataStreams.add(new DataInputStream(inputStream));
+ }
+ this.executor = executor;
+ }
+
+ @Override
+ public long getId() {
+ return ledgerId;
+ }
+
+ @Override
+ public LedgerMetadata getLedgerMetadata() {
+ return indices.get(0).getLedgerMetadata(ledgerId);
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ executor.submit(() -> {
+ try {
+ for (StreamingOffloadIndexBlock indexBlock : indices) {
+ indexBlock.close();
+ }
+ for (BackedInputStream inputStream : inputStreams) {
+ inputStream.close();
+ }
+ promise.complete(null);
+ } catch (IOException t) {
+ promise.completeExceptionally(t);
+ }
+ });
+ return promise;
+ }
+
+ @Override
+ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long
lastEntry) {
+ log.debug("Ledger {}: reading {} - {}", getId(), firstEntry,
lastEntry);
+ CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
+ if (firstEntry > lastEntry
+ || firstEntry < 0
+ || lastEntry > getLastAddConfirmed()) {
+ promise.completeExceptionally(new
BKException.BKIncorrectParameterException());
+ return promise;
+ }
+ executor.submit(() -> {
+ List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
+ List<GroupedReader> groupedReaders = null;
+ try {
+ groupedReaders = getGroupedReader(firstEntry, lastEntry);
+ } catch (Exception e) {
+ promise.completeExceptionally(e);
+ return;
+ }
+
+ for (GroupedReader groupedReader : groupedReaders) {
Review comment:
Please double confirm the `groupedReaders` are sorted by ledger Id,
entry id, Otherwise, this will bread the entry read in order guarantee
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/StreamingBlobStoreBackedReadHandleImpl.java
##########
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import lombok.val;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
+import org.apache.bookkeeper.mledger.offload.jcloud.StreamingOffloadIndexBlock;
+import
org.apache.bookkeeper.mledger.offload.jcloud.StreamingOffloadIndexBlockBuilder;
+import
org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.domain.Blob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamingBlobStoreBackedReadHandleImpl implements ReadHandle {
+ private static final Logger log =
LoggerFactory.getLogger(StreamingBlobStoreBackedReadHandleImpl.class);
+
+ private final long ledgerId;
+ private final List<StreamingOffloadIndexBlock> indices;
+ private final List<BackedInputStream> inputStreams;
+ private final List<DataInputStream> dataStreams;
+ private final ExecutorService executor;
+
+ static class GroupedReader {
+ long ledgerId;
+ long firstEntry;
+ long lastEntry;
+ StreamingOffloadIndexBlock index;
+ BackedInputStream inputStream;
+ DataInputStream dataStream;
+
+ public GroupedReader(long ledgerId, long firstEntry, long lastEntry,
+ StreamingOffloadIndexBlock index,
+ BackedInputStream inputStream, DataInputStream
dataStream) {
+ this.ledgerId = ledgerId;
+ this.firstEntry = firstEntry;
+ this.lastEntry = lastEntry;
+ this.index = index;
+ this.inputStream = inputStream;
+ this.dataStream = dataStream;
+ }
+ }
+
+ private StreamingBlobStoreBackedReadHandleImpl(long ledgerId,
List<StreamingOffloadIndexBlock> indices,
+ List<BackedInputStream>
inputStreams,
+ ExecutorService executor) {
+ this.ledgerId = ledgerId;
+ this.indices = indices;
+ this.inputStreams = inputStreams;
+ this.dataStreams = new LinkedList<>();
+ for (BackedInputStream inputStream : inputStreams) {
+ dataStreams.add(new DataInputStream(inputStream));
+ }
+ this.executor = executor;
+ }
+
+ @Override
+ public long getId() {
+ return ledgerId;
+ }
+
+ @Override
+ public LedgerMetadata getLedgerMetadata() {
+ return indices.get(0).getLedgerMetadata(ledgerId);
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ executor.submit(() -> {
+ try {
+ for (StreamingOffloadIndexBlock indexBlock : indices) {
+ indexBlock.close();
+ }
+ for (BackedInputStream inputStream : inputStreams) {
+ inputStream.close();
+ }
+ promise.complete(null);
+ } catch (IOException t) {
+ promise.completeExceptionally(t);
+ }
+ });
+ return promise;
+ }
+
+ @Override
+ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long
lastEntry) {
+ log.debug("Ledger {}: reading {} - {}", getId(), firstEntry,
lastEntry);
+ CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
+ if (firstEntry > lastEntry
+ || firstEntry < 0
+ || lastEntry > getLastAddConfirmed()) {
+ promise.completeExceptionally(new
BKException.BKIncorrectParameterException());
Review comment:
Why throw bk exception here? does `IllegalArgumentException` works?
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/StreamingOffloadIndexBlock.java
##########
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud;
+
+import java.io.Closeable;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+
+/**
+ * The Index block abstraction used for offload a ledger to long term storage.
+ */
+@Unstable
+public interface StreamingOffloadIndexBlock extends Closeable {
Review comment:
@Renkai, we are talking about the interface, not the implementation.
From the OffloadIndexBlock interface, most of the methods are the same as the
`StreamingOffloadIndexBlock ` right?
Another unreasonable place is the index block is format concept, Not only
streaming offload can use this format right? If we want a new offloaded that
offload based on the time window or data size but not streaming offloaded, this
new version IndexBlock also can be used?
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +265,202 @@ public String getOffloadDriverName() {
return promise;
}
+ BlobStore blobStore;
+ String streamingDataBlockKey;
+ String streamingDataIndexKey;
+ MultipartUpload streamingMpu = null;
+ List<MultipartPart> streamingParts = Lists.newArrayList();
+
+ @Override
+ public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml,
UUID uuid, long beginLedger,
+ long beginEntry,
+ Map<String,
String> driverMetadata) {
+ this.ml = ml;
+ this.segmentInfo = new SegmentInfoImpl(uuid, beginLedger, beginEntry,
config.getDriver(), driverMetadata);
+ log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+ this.offloadResult = new CompletableFuture<>();
+ blobStore = blobStores.get(config.getBlobStoreLocation());
+ streamingIndexBuilder = StreamingOffloadIndexBlockBuilder.create();
+ streamingDataBlockKey = segmentInfo.uuid.toString();
+ streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+ BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+ DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+ Blob blob = blobBuilder.build();
+ streamingMpu = blobStore
+ .initiateMultipartUpload(config.getBucket(),
blob.getMetadata(), new PutOptions());
+
+ scheduler.chooseThread(segmentInfo).execute(() -> {
+ log.info("start offloading segment: {}", segmentInfo);
+ streamingOffloadLoop(1, 0);
+ });
+ scheduler.schedule(this::closeSegment, maxSegmentCloseTime.toMillis(),
TimeUnit.MILLISECONDS);
+
+ return CompletableFuture.completedFuture(new OffloadHandle() {
+ @Override
+ public boolean canOffer(long size) {
+ return BlobStoreManagedLedgerOffloader.this.canOffer(size);
+ }
+
+ @Override
+ public PositionImpl lastOffered() {
+ return BlobStoreManagedLedgerOffloader.this.lastOffered();
+ }
+
+ @Override
+ public OfferEntryResult offerEntry(Entry entry) {
+ return BlobStoreManagedLedgerOffloader.this.offerEntry(entry);
+ }
+
+ @Override
+ public CompletableFuture<OffloadResult> getOffloadResultAsync() {
+ return
BlobStoreManagedLedgerOffloader.this.getOffloadResultAsync();
+ }
+
+ @Override
+ public boolean close() {
+ return BlobStoreManagedLedgerOffloader.this.closeSegment();
+ }
+ });
+ }
+
+ private void streamingOffloadLoop(int partId, int dataObjectLength) {
+ log.debug("streaming offload loop {} {}", partId, dataObjectLength);
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ offloadResult.complete(segmentInfo.result());
+ return;
+ }
+ final BufferedOffloadStream payloadStream;
+
+ while (offloadBuffer.isEmpty()) {
+ if (segmentInfo.isClosed()) {
+ offloadResult.complete(segmentInfo.result());
+ } else {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ final Entry peek = offloadBuffer.peek();
+ //initialize payload when there is at least one entry
+ final long blockLedgerId = peek.getLedgerId();
+ final long blockEntryId = peek.getEntryId();
+ payloadStream = new BufferedOffloadStream(streamingBlockSize,
offloadBuffer, segmentInfo,
+ blockLedgerId, blockEntryId, bufferLength);
+ try {
+ streamingIndexBuilder.addLedgerMeta(blockLedgerId,
ml.getClosedLedgerInfo(blockLedgerId).get());
+
streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+ } catch (InterruptedException | ExecutionException e) {
+ offloadResult.completeExceptionally(e);
+ return;
+ }
+ log.debug("begin upload payload");
+ Payload partPayload = Payloads.newInputStreamPayload(payloadStream);
+
partPayload.getContentMetadata().setContentType("application/octet-stream");
+ streamingParts.add(blobStore.uploadMultipartPart(streamingMpu, partId,
partPayload));
+ streamingIndexBuilder.addBlock(blockLedgerId, blockEntryId, partId,
streamingBlockSize);
+
+ log.debug("UploadMultipartPart. container: {}, blobName: {}, partId:
{}, mpu: {}",
+ config.getBucket(), streamingDataBlockKey, partId,
streamingMpu.id());
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ log.debug("segment closed, buffer is empty");
+ try {
+ blobStore.completeMultipartUpload(streamingMpu,
streamingParts);
+ streamingIndexBuilder.withDataObjectLength(dataObjectLength +
streamingBlockSize);
+ final StreamingOffloadIndexBlock index =
streamingIndexBuilder.buildStreaming();
+ final IndexInputStream indexStream = index.toStream();
+ final BlobBuilder indexBlobBuilder =
blobStore.blobBuilder(streamingDataIndexKey);
+ DataBlockUtils.addVersionInfo(indexBlobBuilder, userMetadata);
+ final InputStreamPayload indexPayLoad =
Payloads.newInputStreamPayload(indexStream);
+
indexPayLoad.getContentMetadata().setContentLength(indexStream.getStreamSize());
+
indexPayLoad.getContentMetadata().setContentType("application/octet-stream");
+ final Blob indexBlob = indexBlobBuilder.payload(indexPayLoad)
+ .contentLength(indexStream.getStreamSize())
+ .build();
+ blobStore.putBlob(config.getBucket(), indexBlob);
+
+ offloadResult.complete(segmentInfo.result());
+ } catch (Exception e) {
+ log.error("streaming offload failed", e);
+ offloadResult.completeExceptionally(e);
+ }
+ log.debug("offload done");
+ } else {
+ log.debug("continue offload loop");
+ scheduler.chooseThread(segmentInfo)
+ .execute(() -> streamingOffloadLoop(partId + 1,
dataObjectLength + streamingBlockSize));
Review comment:
here will be a serious problem because the streaming offload will retry
to get entry from the queue for every 100ms, if a topic does not write any
messages, the thread will available for other topic offloading until the
offload segment timeout(closed) right?
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/StreamingBlobStoreBackedReadHandleImpl.java
##########
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import lombok.val;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
+import org.apache.bookkeeper.mledger.offload.jcloud.StreamingOffloadIndexBlock;
+import
org.apache.bookkeeper.mledger.offload.jcloud.StreamingOffloadIndexBlockBuilder;
+import
org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.domain.Blob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamingBlobStoreBackedReadHandleImpl implements ReadHandle {
+ private static final Logger log =
LoggerFactory.getLogger(StreamingBlobStoreBackedReadHandleImpl.class);
+
+ private final long ledgerId;
+ private final List<StreamingOffloadIndexBlock> indices;
+ private final List<BackedInputStream> inputStreams;
+ private final List<DataInputStream> dataStreams;
+ private final ExecutorService executor;
+
+ static class GroupedReader {
+ long ledgerId;
+ long firstEntry;
+ long lastEntry;
+ StreamingOffloadIndexBlock index;
+ BackedInputStream inputStream;
+ DataInputStream dataStream;
+
+ public GroupedReader(long ledgerId, long firstEntry, long lastEntry,
+ StreamingOffloadIndexBlock index,
+ BackedInputStream inputStream, DataInputStream
dataStream) {
+ this.ledgerId = ledgerId;
+ this.firstEntry = firstEntry;
+ this.lastEntry = lastEntry;
+ this.index = index;
+ this.inputStream = inputStream;
+ this.dataStream = dataStream;
+ }
+ }
+
+ private StreamingBlobStoreBackedReadHandleImpl(long ledgerId,
List<StreamingOffloadIndexBlock> indices,
+ List<BackedInputStream>
inputStreams,
+ ExecutorService executor) {
+ this.ledgerId = ledgerId;
+ this.indices = indices;
+ this.inputStreams = inputStreams;
+ this.dataStreams = new LinkedList<>();
+ for (BackedInputStream inputStream : inputStreams) {
+ dataStreams.add(new DataInputStream(inputStream));
+ }
+ this.executor = executor;
+ }
+
+ @Override
+ public long getId() {
+ return ledgerId;
+ }
+
+ @Override
+ public LedgerMetadata getLedgerMetadata() {
+ return indices.get(0).getLedgerMetadata(ledgerId);
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ executor.submit(() -> {
+ try {
+ for (StreamingOffloadIndexBlock indexBlock : indices) {
+ indexBlock.close();
+ }
+ for (BackedInputStream inputStream : inputStreams) {
+ inputStream.close();
+ }
+ promise.complete(null);
Review comment:
also close the `dataStreams`?
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BufferedOffloadStream.java
##########
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.impl.SegmentInfoImpl;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+
+@Slf4j
+public class BufferedOffloadStream extends InputStream {
Review comment:
Is the BufferedOffloadStream need to be close after offload complete? I
notice during the offload loop, the `BufferedOffloadStream` is created for each
loop, but can't find where to close it.
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -230,6 +260,183 @@ public String getOffloadDriverName() {
return promise;
}
+ BlobStore blobStore;
+ String streamingDataBlockKey;
+ String streamingDataIndexKey;
+ MultipartUpload streamingMpu = null;
+ List<MultipartPart> streamingParts = Lists.newArrayList();
+
+ @Override
+ public CompletableFuture<OffloadHandle> streamingOffload(ManagedLedger ml,
UUID uuid, long beginLedger,
+ long beginEntry,
+ Map<String,
String> driverMetadata) {
+ this.ml = ml;
+ this.segmentInfo = new SegmentInfoImpl(uuid, beginLedger, beginEntry,
config.getDriver(), driverMetadata);
+ log.debug("begin offload with {}:{}", beginLedger, beginEntry);
+ this.offloadResult = new CompletableFuture<>();
+ blobStore = blobStores.get(config.getBlobStoreLocation());
+ streamingIndexBuilder = new StreamingOffloadIndexBlockBuilderImpl();
+ streamingDataBlockKey = segmentInfo.uuid.toString();
+ streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
+ BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
+ DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
+ Blob blob = blobBuilder.build();
+ streamingMpu = blobStore
+ .initiateMultipartUpload(config.getBucket(),
blob.getMetadata(), new PutOptions());
+
+ scheduler.chooseThread(segmentInfo).execute(() -> {
+ log.info("start offloading segment: {}", segmentInfo);
+ streamingOffloadLoop(1, 0);
+ });
+ scheduler.schedule(this::closeSegment, segmentCloseTime.toMillis(),
TimeUnit.MILLISECONDS);
+
+ return CompletableFuture.completedFuture(new OffloadHandle() {
+ @Override
+ public boolean canOffer(long size) {
+ return BlobStoreManagedLedgerOffloader.this.canOffer(size);
+ }
+
+ @Override
+ public PositionImpl lastOffered() {
+ return BlobStoreManagedLedgerOffloader.this.lastOffered();
+ }
+
+ @Override
+ public boolean offerEntry(EntryImpl entry) throws
OffloadSegmentClosedException,
+ OffloadNotConsecutiveException {
+ return BlobStoreManagedLedgerOffloader.this.offerEntry(entry);
+ }
+
+ @Override
+ public CompletableFuture<OffloadResult> getOffloadResultAsync() {
+ return
BlobStoreManagedLedgerOffloader.this.getOffloadResultAsync();
+ }
+ });
+ }
+
+ private void streamingOffloadLoop(int partId, int dataObjectLength) {
+ log.debug("streaming offload loop {} {}", partId, dataObjectLength);
+ if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
+ offloadResult.complete(segmentInfo.result());
+ return;
+ }
+ final BufferedOffloadStream payloadStream;
+
+ while (offloadBuffer.isEmpty()) {
+ if (segmentInfo.isClosed()) {
+ offloadResult.complete(segmentInfo.result());
+ } else {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ final Entry peek = offloadBuffer.peek();
+ //initialize payload when there is at least one entry
+ final long blockLedgerId = peek.getLedgerId();
+ final long blockEntryId = peek.getEntryId();
+ payloadStream = new BufferedOffloadStream(streamingBlockSize,
offloadBuffer, segmentInfo,
+ blockLedgerId, blockEntryId, bufferLength);
+ try {
+ streamingIndexBuilder.addLedgerMeta(blockLedgerId,
ml.getRawLedgerMetadata(blockLedgerId).get());
+
streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
+ } catch (InterruptedException | ExecutionException e) {
+ offloadResult.completeExceptionally(e);
+ return;
+ }
+ log.debug("begin upload payload");
Review comment:
+1
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -22,19 +22,115 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-
+import lombok.ToString;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import
org.apache.bookkeeper.mledger.ManagedLedgerException.OffloadNotConsecutiveException;
+import
org.apache.bookkeeper.mledger.ManagedLedgerException.OffloadSegmentClosedException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
/**
- * Interface for offloading ledgers to long-term storage
+ * Interface for offloading ledgers to long-term storage.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Evolving
public interface LedgerOffloader {
+ interface SegmentInfo {
+ boolean isClosed();
+
+ OffloadResult result();
+ }
+
+ @ToString
+ class SegmentInfoImpl implements SegmentInfo {
+ public SegmentInfoImpl(UUID uuid, long beginLedger, long beginEntry,
String driverName,
+ Map<String, String> driverMetadata) {
+ this.uuid = uuid;
+ this.beginLedger = beginLedger;
+ this.beginEntry = beginEntry;
+ this.driverName = driverName;
+ this.driverMetadata = driverMetadata;
+ }
+
+
+ public final UUID uuid;
+ public final long beginLedger;
+ public final long beginEntry;
+ public final String driverName;
+ volatile private long endLedger;
+ volatile private long endEntry;
+ volatile boolean closed = false;
+ public final Map<String, String> driverMetadata;
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public void closeSegment(long endLedger, long endEntry) {
+ this.endLedger = endLedger;
+ this.endEntry = endEntry;
+ this.closed = true;
+ }
+
+ public OffloadResult result() {
+ return new OffloadResult(beginLedger, beginEntry, endLedger,
endEntry);
+ }
+ }
+
+
+ class OffloadResult {
+ public final long beginLedger;
+ public final long beginEntry;
+ public final long endLedger;
+ public final long endEntry;
+
+ public OffloadResult(long beginLedger, long beginEntry, long
endLedger, long endEntry) {
+ this.beginLedger = beginLedger;
+ this.beginEntry = beginEntry;
+ this.endLedger = endLedger;
+ this.endEntry = endEntry;
+ }
+ }
+
+ /**
+ * Used to store driver info, buffer entries, mark progress, etc.
+ * Create one per second.
+ */
+ interface OffloadHandle {
+
+ /**
+ * return true when both buffer have enough size and ledger/entry id
is next to the current one.
+ * @param size
+ * @return
+ */
+ boolean canOffer(long size);
+
+ default CompletableFuture<Boolean> asyncCanOffer(long size) {
Review comment:
You can't assume the `canOffer` always "efficient", it depends on the
implementations. From the interface perspective, this is not the right way.
use default for the interface method, this means it's not required for the
implementation to implement this method, so an implementation can only
implement the sync method. If the sync method need some time-consuming
operations, This destroys the original intention of the interface
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BufferedOffloadStream.java
##########
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.impl.SegmentInfoImpl;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+
+@Slf4j
+public class BufferedOffloadStream extends InputStream {
+ static final int[] BLOCK_END_PADDING =
BlockAwareSegmentInputStreamImpl.BLOCK_END_PADDING;
+ private final SegmentInfoImpl segmentInfo;
+
+ private final long ledgerId;
+ private final long beginEntryId;
+ private AtomicLong bufferLength;
+ static final int ENTRY_HEADER_SIZE = 4 /* entry size */ + 8 /* entry id */;
+ private final long blockSize;
+ private final ConcurrentLinkedQueue<Entry> entryBuffer;
+ private final InputStream blockHead;
+ int offset = 0;
+ static int NOT_INITIALIZED = -1;
+ int validDataOffset = NOT_INITIALIZED;
+ CompositeByteBuf currentEntry;
+
+ public long getLedgerId() {
+ return ledgerId;
+ }
+
+ public long getBeginEntryId() {
+ return beginEntryId;
+ }
+
+ public long getBlockSize() {
+ return blockSize;
+ }
+
+
+ public BufferedOffloadStream(int blockSize,
+ ConcurrentLinkedQueue<Entry> entryBuffer,
+ SegmentInfoImpl segmentInfo,
+ long ledgerId,
+ long beginEntryId,
+ AtomicLong bufferLength) {
+ this.ledgerId = ledgerId;
+ this.beginEntryId = beginEntryId;
+ this.blockSize = blockSize;
+ this.segmentInfo = segmentInfo;
+ this.entryBuffer = entryBuffer;
+ this.bufferLength = bufferLength;
+ this.blockHead = StreamingDataBlockHeaderImpl.of(blockSize, ledgerId,
beginEntryId)
+ .toStream();
+ }
+
+
+ @Override
+ public int read() throws IOException {
+ if (blockHead.available() > 0) {
+ offset++;
+ return blockHead.read();
+ }
+ //if current exists, use current first
+ if (currentEntry != null) {
+ if (currentEntry.readableBytes() > 0) {
+ offset += 1;
+ return currentEntry.readUnsignedByte();
+ } else {
+ currentEntry.release();
+ currentEntry = null;
+ }
+ }
+
+ if (blockSize <= offset) {
+ return -1;
+ } else if (validDataOffset != NOT_INITIALIZED) {
+ return BLOCK_END_PADDING[(offset++ - validDataOffset) %
BLOCK_END_PADDING.length];
+ }
+
+ Entry headEntry;
+
+ while ((headEntry = entryBuffer.peek()) == null) {
+ if (segmentInfo.isClosed()) {
+ if (validDataOffset == NOT_INITIALIZED) {
+ validDataOffset = offset;
+ }
+ return read();
+ } else {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ log.error("sleep failed", e);
+ }
+ }
+ }
+
+ //create new block when a ledger end
+ if (headEntry.getLedgerId() != this.ledgerId) {
+ if (validDataOffset == NOT_INITIALIZED) {
+ validDataOffset = offset;
+ }
+ return read();
+ }
+
+ if (blockSize >= offset
+ + ENTRY_HEADER_SIZE
+ + headEntry.getLength()) {
+ entryBuffer.poll();
Review comment:
We should to recycle the entries after read complete?
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -22,19 +22,115 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-
+import lombok.ToString;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import
org.apache.bookkeeper.mledger.ManagedLedgerException.OffloadNotConsecutiveException;
+import
org.apache.bookkeeper.mledger.ManagedLedgerException.OffloadSegmentClosedException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
/**
- * Interface for offloading ledgers to long-term storage
+ * Interface for offloading ledgers to long-term storage.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Evolving
public interface LedgerOffloader {
+ interface SegmentInfo {
+ boolean isClosed();
+
+ OffloadResult result();
+ }
+
+ @ToString
+ class SegmentInfoImpl implements SegmentInfo {
+ public SegmentInfoImpl(UUID uuid, long beginLedger, long beginEntry,
String driverName,
+ Map<String, String> driverMetadata) {
+ this.uuid = uuid;
+ this.beginLedger = beginLedger;
+ this.beginEntry = beginEntry;
+ this.driverName = driverName;
+ this.driverMetadata = driverMetadata;
+ }
+
+
+ public final UUID uuid;
+ public final long beginLedger;
+ public final long beginEntry;
+ public final String driverName;
+ volatile private long endLedger;
+ volatile private long endEntry;
+ volatile boolean closed = false;
+ public final Map<String, String> driverMetadata;
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public void closeSegment(long endLedger, long endEntry) {
+ this.endLedger = endLedger;
+ this.endEntry = endEntry;
+ this.closed = true;
+ }
+
+ public OffloadResult result() {
+ return new OffloadResult(beginLedger, beginEntry, endLedger,
endEntry);
+ }
+ }
+
+
+ class OffloadResult {
+ public final long beginLedger;
+ public final long beginEntry;
+ public final long endLedger;
+ public final long endEntry;
+
+ public OffloadResult(long beginLedger, long beginEntry, long
endLedger, long endEntry) {
+ this.beginLedger = beginLedger;
+ this.beginEntry = beginEntry;
+ this.endLedger = endLedger;
+ this.endEntry = endEntry;
+ }
+ }
+
+ /**
+ * Used to store driver info, buffer entries, mark progress, etc.
+ * Create one per second.
+ */
+ interface OffloadHandle {
+
+ /**
+ * return true when both buffer have enough size and ledger/entry id
is next to the current one.
+ * @param size
+ * @return
+ */
+ boolean canOffer(long size);
+
+ default CompletableFuture<Boolean> asyncCanOffer(long size) {
+ return CompletableFuture.completedFuture(canOffer(size));
+ }
+
+ PositionImpl lastOffered();
Review comment:
From the interface perspective, we should use `Position`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]