sijie commented on a change in pull request #9096:
URL: https://github.com/apache/pulsar/pull/9096#discussion_r557577627
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -22,19 +22,115 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-
+import lombok.ToString;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import
org.apache.bookkeeper.mledger.ManagedLedgerException.OffloadNotConsecutiveException;
+import
org.apache.bookkeeper.mledger.ManagedLedgerException.OffloadSegmentClosedException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
/**
- * Interface for offloading ledgers to long-term storage
+ * Interface for offloading ledgers to long-term storage.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Evolving
public interface LedgerOffloader {
+ interface SegmentInfo {
+ boolean isClosed();
+
+ OffloadResult result();
+ }
+
+ @ToString
+ class SegmentInfoImpl implements SegmentInfo {
Review comment:
Can you move the implementation outside of an interface?
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -22,19 +22,115 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-
+import lombok.ToString;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import
org.apache.bookkeeper.mledger.ManagedLedgerException.OffloadNotConsecutiveException;
+import
org.apache.bookkeeper.mledger.ManagedLedgerException.OffloadSegmentClosedException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
/**
- * Interface for offloading ledgers to long-term storage
+ * Interface for offloading ledgers to long-term storage.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Evolving
public interface LedgerOffloader {
+ interface SegmentInfo {
+ boolean isClosed();
+
+ OffloadResult result();
+ }
+
+ @ToString
+ class SegmentInfoImpl implements SegmentInfo {
+ public SegmentInfoImpl(UUID uuid, long beginLedger, long beginEntry,
String driverName,
+ Map<String, String> driverMetadata) {
+ this.uuid = uuid;
+ this.beginLedger = beginLedger;
+ this.beginEntry = beginEntry;
+ this.driverName = driverName;
+ this.driverMetadata = driverMetadata;
+ }
+
+
+ public final UUID uuid;
+ public final long beginLedger;
+ public final long beginEntry;
+ public final String driverName;
+ volatile private long endLedger;
+ volatile private long endEntry;
+ volatile boolean closed = false;
+ public final Map<String, String> driverMetadata;
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public void closeSegment(long endLedger, long endEntry) {
+ this.endLedger = endLedger;
+ this.endEntry = endEntry;
+ this.closed = true;
+ }
+
+ public OffloadResult result() {
+ return new OffloadResult(beginLedger, beginEntry, endLedger,
endEntry);
+ }
+ }
+
+
+ class OffloadResult {
+ public final long beginLedger;
+ public final long beginEntry;
+ public final long endLedger;
+ public final long endEntry;
+
+ public OffloadResult(long beginLedger, long beginEntry, long
endLedger, long endEntry) {
+ this.beginLedger = beginLedger;
+ this.beginEntry = beginEntry;
+ this.endLedger = endLedger;
+ this.endEntry = endEntry;
+ }
+ }
+
+ /**
+ * Used to store driver info, buffer entries, mark progress, etc.
+ * Create one per second.
Review comment:
What does "create one per second" mean?
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/StreamingOffloadIndexBlock.java
##########
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud;
+
+import java.io.Closeable;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+
+/**
+ * The Index block abstraction used for offload a ledger to long term storage.
+ */
+@Unstable
+public interface StreamingOffloadIndexBlock extends Closeable {
+
+ /**
+ * Get the content of the index block as InputStream.
+ * Read out in format:
+ * | index_magic_header | index_block_len | index_entry_count |
+ * | data_object_size | segment_metadata_length | segment metadata |
index entries ... |
+ */
+ IndexInputStream toStream() throws IOException;
+
+ /**
+ * Get the related OffloadIndexEntry that contains the given
messageEntryId.
+ *
+ * @param messageEntryId
+ * the entry id of message
+ * @return the offload index entry
+ */
+ OffloadIndexEntry getIndexEntryForEntry(long ledgerId, long
messageEntryId) throws IOException;
+
+ public long getStartEntryId(long ledgerId);
+
+ /**
+ * Get the entry count that contained in this index Block.
+ */
+ int getEntryCount();
+
+ /**
+ * Get LedgerMetadata.
+ * @return
+ */
+ LedgerMetadata getLedgerMetadata(long ledgerId);
+
+ /**
+ * Get the total size of the data object.
+ */
+ long getDataObjectLength();
+
+ /**
+ * Get the length of the header in the blocks in the data object.
+ */
+ long getDataBlockHeaderLength();
+
+ /**
+ * An input stream which knows the size of the stream upfront.
+ */
+ class IndexInputStream extends FilterInputStream {
Review comment:
Can we move implementation out of this interface?
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -22,19 +22,115 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-
+import lombok.ToString;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import
org.apache.bookkeeper.mledger.ManagedLedgerException.OffloadNotConsecutiveException;
+import
org.apache.bookkeeper.mledger.ManagedLedgerException.OffloadSegmentClosedException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
/**
- * Interface for offloading ledgers to long-term storage
+ * Interface for offloading ledgers to long-term storage.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Evolving
public interface LedgerOffloader {
+ interface SegmentInfo {
+ boolean isClosed();
+
+ OffloadResult result();
+ }
+
+ @ToString
+ class SegmentInfoImpl implements SegmentInfo {
+ public SegmentInfoImpl(UUID uuid, long beginLedger, long beginEntry,
String driverName,
+ Map<String, String> driverMetadata) {
+ this.uuid = uuid;
+ this.beginLedger = beginLedger;
+ this.beginEntry = beginEntry;
+ this.driverName = driverName;
+ this.driverMetadata = driverMetadata;
+ }
+
+
+ public final UUID uuid;
+ public final long beginLedger;
+ public final long beginEntry;
+ public final String driverName;
+ volatile private long endLedger;
+ volatile private long endEntry;
+ volatile boolean closed = false;
+ public final Map<String, String> driverMetadata;
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public void closeSegment(long endLedger, long endEntry) {
+ this.endLedger = endLedger;
+ this.endEntry = endEntry;
+ this.closed = true;
+ }
+
+ public OffloadResult result() {
+ return new OffloadResult(beginLedger, beginEntry, endLedger,
endEntry);
+ }
+ }
+
+
+ class OffloadResult {
+ public final long beginLedger;
+ public final long beginEntry;
+ public final long endLedger;
+ public final long endEntry;
+
+ public OffloadResult(long beginLedger, long beginEntry, long
endLedger, long endEntry) {
+ this.beginLedger = beginLedger;
+ this.beginEntry = beginEntry;
+ this.endLedger = endLedger;
+ this.endEntry = endEntry;
+ }
+ }
+
+ /**
+ * Used to store driver info, buffer entries, mark progress, etc.
+ * Create one per second.
+ */
+ interface OffloadHandle {
+
+ /**
+ * return true when both buffer have enough size and ledger/entry id
is next to the current one.
+ * @param size
+ * @return
+ */
+ boolean canOffer(long size);
+
+ default CompletableFuture<Boolean> asyncCanOffer(long size) {
+ return CompletableFuture.completedFuture(canOffer(size));
+ }
+
+ PositionImpl lastOffered();
Review comment:
`PositionImpl` is an implementation. Should this be `Position`?
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -22,19 +22,115 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-
+import lombok.ToString;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import
org.apache.bookkeeper.mledger.ManagedLedgerException.OffloadNotConsecutiveException;
+import
org.apache.bookkeeper.mledger.ManagedLedgerException.OffloadSegmentClosedException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
/**
- * Interface for offloading ledgers to long-term storage
+ * Interface for offloading ledgers to long-term storage.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Evolving
public interface LedgerOffloader {
+ interface SegmentInfo {
+ boolean isClosed();
+
+ OffloadResult result();
+ }
+
+ @ToString
+ class SegmentInfoImpl implements SegmentInfo {
+ public SegmentInfoImpl(UUID uuid, long beginLedger, long beginEntry,
String driverName,
+ Map<String, String> driverMetadata) {
+ this.uuid = uuid;
+ this.beginLedger = beginLedger;
+ this.beginEntry = beginEntry;
+ this.driverName = driverName;
+ this.driverMetadata = driverMetadata;
+ }
+
+
+ public final UUID uuid;
+ public final long beginLedger;
+ public final long beginEntry;
+ public final String driverName;
+ volatile private long endLedger;
+ volatile private long endEntry;
+ volatile boolean closed = false;
+ public final Map<String, String> driverMetadata;
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public void closeSegment(long endLedger, long endEntry) {
+ this.endLedger = endLedger;
+ this.endEntry = endEntry;
+ this.closed = true;
+ }
+
+ public OffloadResult result() {
+ return new OffloadResult(beginLedger, beginEntry, endLedger,
endEntry);
+ }
+ }
+
+
+ class OffloadResult {
+ public final long beginLedger;
+ public final long beginEntry;
+ public final long endLedger;
+ public final long endEntry;
+
+ public OffloadResult(long beginLedger, long beginEntry, long
endLedger, long endEntry) {
+ this.beginLedger = beginLedger;
+ this.beginEntry = beginEntry;
+ this.endLedger = endLedger;
+ this.endEntry = endEntry;
+ }
+ }
+
+ /**
+ * Used to store driver info, buffer entries, mark progress, etc.
+ * Create one per second.
+ */
+ interface OffloadHandle {
+
+ /**
+ * return true when both buffer have enough size and ledger/entry id
is next to the current one.
+ * @param size
+ * @return
+ */
+ boolean canOffer(long size);
+
+ default CompletableFuture<Boolean> asyncCanOffer(long size) {
+ return CompletableFuture.completedFuture(canOffer(size));
+ }
+
+ PositionImpl lastOffered();
+
+ default CompletableFuture<PositionImpl> asyncLastOffered() {
+ return CompletableFuture.completedFuture(lastOffered());
+ }
+
+ boolean offerEntry(EntryImpl entry) throws
OffloadSegmentClosedException,
Review comment:
Should this be `EntryImpl` or `Entry`?
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
##########
@@ -595,4 +594,10 @@ void asyncSetProperties(Map<String, String> properties,
final AsyncCallbacks.Upd
* Get the ManagedLedgerInterceptor for ManagedLedger.
* */
ManagedLedgerInterceptor getManagedLedgerInterceptor();
+
+ /**
+ * Get basic ledger summary after the ledger is closed.
+ * will got exception if corresponding ledger was not closed when the
method called.
+ */
+ CompletableFuture<LedgerInfo> getClosedLedgerInfo(long ledgerId) throws
ManagedLedgerException;
Review comment:
If this interface is designed to be an async method, the exception
should be returned from the `CompletableFuture`. The interface seems to be mix
async semantic with sync semantic together.
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/StreamingBlobStoreBackedReadHandleImpl.java
##########
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import lombok.val;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
+import org.apache.bookkeeper.mledger.offload.jcloud.StreamingOffloadIndexBlock;
+import
org.apache.bookkeeper.mledger.offload.jcloud.StreamingOffloadIndexBlockBuilder;
+import
org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.domain.Blob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamingBlobStoreBackedReadHandleImpl implements ReadHandle {
+ private static final Logger log =
LoggerFactory.getLogger(StreamingBlobStoreBackedReadHandleImpl.class);
+
+ private final long ledgerId;
+ private final List<StreamingOffloadIndexBlock> indices;
+ private final List<BackedInputStream> inputStreams;
+ private final List<DataInputStream> dataStreams;
+ private final ExecutorService executor;
+
+ static class GroupedReader {
+ long ledgerId;
+ long firstEntry;
+ long lastEntry;
+ StreamingOffloadIndexBlock index;
+ BackedInputStream inputStream;
+ DataInputStream dataStream;
+
+ public GroupedReader(long ledgerId, long firstEntry, long lastEntry,
+ StreamingOffloadIndexBlock index,
+ BackedInputStream inputStream, DataInputStream
dataStream) {
+ this.ledgerId = ledgerId;
+ this.firstEntry = firstEntry;
+ this.lastEntry = lastEntry;
+ this.index = index;
+ this.inputStream = inputStream;
+ this.dataStream = dataStream;
+ }
+ }
+
+ private StreamingBlobStoreBackedReadHandleImpl(long ledgerId,
List<StreamingOffloadIndexBlock> indices,
+ List<BackedInputStream>
inputStreams,
+ ExecutorService executor) {
+ this.ledgerId = ledgerId;
+ this.indices = indices;
+ this.inputStreams = inputStreams;
+ this.dataStreams = new LinkedList<>();
+ for (BackedInputStream inputStream : inputStreams) {
+ dataStreams.add(new DataInputStream(inputStream));
+ }
+ this.executor = executor;
+ }
+
+ @Override
+ public long getId() {
+ return ledgerId;
+ }
+
+ @Override
+ public LedgerMetadata getLedgerMetadata() {
+ return indices.get(0).getLedgerMetadata(ledgerId);
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ executor.submit(() -> {
+ try {
+ for (StreamingOffloadIndexBlock indexBlock : indices) {
+ indexBlock.close();
+ }
+ for (BackedInputStream inputStream : inputStreams) {
+ inputStream.close();
+ }
+ promise.complete(null);
+ } catch (IOException t) {
+ promise.completeExceptionally(t);
+ }
+ });
+ return promise;
+ }
+
+ @Override
+ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long
lastEntry) {
+ log.debug("Ledger {}: reading {} - {}", getId(), firstEntry,
lastEntry);
Review comment:
Put `log.debug` into `if (log.isDebugEnabled()) { ... }`
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -85,6 +181,34 @@
UUID uid,
Map<String, String> extraMetadata);
+ /**
+ * Begin offload the passed in ledgers to longterm storage, it will finish
+ * when a segment reached it's size or time.
+ * Metadata passed in is for inspection purposes only and should be stored
+ * alongside the segment data.
+ *
+ * When the returned OffloaderHandle.getOffloadResultAsync completes, the
corresponding
+ * ledgers has been persisted to the
+ * loadterm storage, so it is safe to delete the original copy in
bookkeeper.
Review comment:
```suggestion
* longterm storage, so it is safe to delete the original copy in
bookkeeper.
```
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -22,19 +22,115 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-
+import lombok.ToString;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import
org.apache.bookkeeper.mledger.ManagedLedgerException.OffloadNotConsecutiveException;
+import
org.apache.bookkeeper.mledger.ManagedLedgerException.OffloadSegmentClosedException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
/**
- * Interface for offloading ledgers to long-term storage
+ * Interface for offloading ledgers to long-term storage.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Evolving
public interface LedgerOffloader {
+ interface SegmentInfo {
+ boolean isClosed();
+
+ OffloadResult result();
+ }
+
+ @ToString
+ class SegmentInfoImpl implements SegmentInfo {
+ public SegmentInfoImpl(UUID uuid, long beginLedger, long beginEntry,
String driverName,
+ Map<String, String> driverMetadata) {
+ this.uuid = uuid;
+ this.beginLedger = beginLedger;
+ this.beginEntry = beginEntry;
+ this.driverName = driverName;
+ this.driverMetadata = driverMetadata;
+ }
+
+
+ public final UUID uuid;
+ public final long beginLedger;
+ public final long beginEntry;
+ public final String driverName;
+ volatile private long endLedger;
+ volatile private long endEntry;
+ volatile boolean closed = false;
+ public final Map<String, String> driverMetadata;
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public void closeSegment(long endLedger, long endEntry) {
+ this.endLedger = endLedger;
+ this.endEntry = endEntry;
+ this.closed = true;
+ }
+
+ public OffloadResult result() {
+ return new OffloadResult(beginLedger, beginEntry, endLedger,
endEntry);
+ }
+ }
+
+
+ class OffloadResult {
+ public final long beginLedger;
+ public final long beginEntry;
+ public final long endLedger;
+ public final long endEntry;
+
+ public OffloadResult(long beginLedger, long beginEntry, long
endLedger, long endEntry) {
+ this.beginLedger = beginLedger;
+ this.beginEntry = beginEntry;
+ this.endLedger = endLedger;
+ this.endEntry = endEntry;
+ }
+ }
+
+ /**
+ * Used to store driver info, buffer entries, mark progress, etc.
+ * Create one per second.
+ */
+ interface OffloadHandle {
+
+ /**
+ * return true when both buffer have enough size and ledger/entry id
is next to the current one.
+ * @param size
+ * @return
+ */
+ boolean canOffer(long size);
+
+ default CompletableFuture<Boolean> asyncCanOffer(long size) {
Review comment:
It is a bit strange to implement an async method over a sync method. We
usually implement the other way around.
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -1685,6 +1686,20 @@ void asyncReadEntries(OpReadEntry opReadEntry) {
return getLedgerHandle(ledgerId).thenApply(rh ->
rh.getLedgerMetadata().toSafeString());
}
+ @Override
+ public CompletableFuture<LedgerInfo> getClosedLedgerInfo(long ledgerId)
throws ManagedLedgerException {
+ final LedgerInfo ledgerInfo = ledgers.get(ledgerId);
+ if (ledgerInfo == null) {
+ throw new ManagedLedgerException(
+ Strings.lenientFormat("ledger with id %s not found",
ledgerId));
+ } else if (ledgerInfo.getSize() == 0 || ledgerInfo.getEntries() == 0) {
Review comment:
What happens if the ledger is closed with zero entries?
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
##########
@@ -63,6 +63,12 @@
public static final String METADATA_FIELD_MAX_BLOCK_SIZE =
"maxBlockSizeInBytes";
public static final String METADATA_FIELD_READ_BUFFER_SIZE =
"readBufferSizeInBytes";
public static final String OFFLOADER_PROPERTY_PREFIX =
"managedLedgerOffload";
+ public static final String MAX_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC =
"maxOffloadSegmentRolloverTimeInSeconds";
+ public static final String MIN_OFFLOAD_SEGMENT_ROLLOVER_TIME_SEC =
"minOffloadSegmentRolloverTimeInSeconds";
+ public static final long DEFAULT_MAX_SEGMENT_TIME_IN_SECOND = 600;
+ public static final long DEFAULT_MIN_SEGMENT_TIME_IN_SECOND = 0;
+ public static final String MAX_SEGMENT_SIZE_IN_BYTES =
"maxSegmentSizeInBytes";
Review comment:
Can you rename it to `maxOffloadSegmentSizeInBytes`?
##########
File path:
tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderBase.java
##########
@@ -82,7 +71,11 @@ protected static MockZooKeeper createMockZooKeeper() throws
Exception {
CreateMode.PERSISTENT);
return zk;
}
-
+
+ protected static MockManagedLedger createMockManagedLedger() {
Review comment:
Make sense now.
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/StreamingBlobStoreBackedReadHandleImpl.java
##########
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import lombok.val;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
+import org.apache.bookkeeper.mledger.offload.jcloud.StreamingOffloadIndexBlock;
+import
org.apache.bookkeeper.mledger.offload.jcloud.StreamingOffloadIndexBlockBuilder;
+import
org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.domain.Blob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamingBlobStoreBackedReadHandleImpl implements ReadHandle {
+ private static final Logger log =
LoggerFactory.getLogger(StreamingBlobStoreBackedReadHandleImpl.class);
+
+ private final long ledgerId;
+ private final List<StreamingOffloadIndexBlock> indices;
+ private final List<BackedInputStream> inputStreams;
+ private final List<DataInputStream> dataStreams;
+ private final ExecutorService executor;
+
+ static class GroupedReader {
+ long ledgerId;
+ long firstEntry;
+ long lastEntry;
+ StreamingOffloadIndexBlock index;
+ BackedInputStream inputStream;
+ DataInputStream dataStream;
+
+ public GroupedReader(long ledgerId, long firstEntry, long lastEntry,
+ StreamingOffloadIndexBlock index,
+ BackedInputStream inputStream, DataInputStream
dataStream) {
+ this.ledgerId = ledgerId;
+ this.firstEntry = firstEntry;
+ this.lastEntry = lastEntry;
+ this.index = index;
+ this.inputStream = inputStream;
+ this.dataStream = dataStream;
+ }
+ }
+
+ private StreamingBlobStoreBackedReadHandleImpl(long ledgerId,
List<StreamingOffloadIndexBlock> indices,
+ List<BackedInputStream>
inputStreams,
+ ExecutorService executor) {
+ this.ledgerId = ledgerId;
+ this.indices = indices;
+ this.inputStreams = inputStreams;
+ this.dataStreams = new LinkedList<>();
+ for (BackedInputStream inputStream : inputStreams) {
+ dataStreams.add(new DataInputStream(inputStream));
+ }
+ this.executor = executor;
+ }
+
+ @Override
+ public long getId() {
+ return ledgerId;
+ }
+
+ @Override
+ public LedgerMetadata getLedgerMetadata() {
+ return indices.get(0).getLedgerMetadata(ledgerId);
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ executor.submit(() -> {
+ try {
+ for (StreamingOffloadIndexBlock indexBlock : indices) {
+ indexBlock.close();
+ }
+ for (BackedInputStream inputStream : inputStreams) {
+ inputStream.close();
+ }
+ promise.complete(null);
+ } catch (IOException t) {
+ promise.completeExceptionally(t);
+ }
+ });
+ return promise;
+ }
+
+ @Override
+ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long
lastEntry) {
+ log.debug("Ledger {}: reading {} - {}", getId(), firstEntry,
lastEntry);
+ CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
+ if (firstEntry > lastEntry
+ || firstEntry < 0
+ || lastEntry > getLastAddConfirmed()) {
+ promise.completeExceptionally(new
BKException.BKIncorrectParameterException());
+ return promise;
+ }
+ executor.submit(() -> {
+ List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
+ List<GroupedReader> groupedReaders = null;
+ try {
+ groupedReaders = getGroupedReader(firstEntry, lastEntry);
+ } catch (Exception e) {
+ promise.completeExceptionally(e);
+ return;
+ }
+
+ for (GroupedReader groupedReader : groupedReaders) {
+ long entriesToRead = (groupedReader.lastEntry -
groupedReader.firstEntry) + 1;
+ long nextExpectedId = groupedReader.firstEntry;
+ try {
+ while (entriesToRead > 0) {
+ int length = groupedReader.dataStream.readInt();
+ if (length < 0) { // hit padding or new block
+ groupedReader.inputStream
+ .seek(groupedReader.index
+
.getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId)
+ .getDataOffset());
+ continue;
+ }
+ long entryId = groupedReader.dataStream.readLong();
+
+ if (entryId == nextExpectedId) {
+ ByteBuf buf =
PulsarByteBufAllocator.DEFAULT.buffer(length, length);
+ entries.add(LedgerEntryImpl.create(ledgerId,
entryId, length, buf));
+ int toWrite = length;
+ while (toWrite > 0) {
+ toWrite -=
buf.writeBytes(groupedReader.dataStream, toWrite);
+ }
+ entriesToRead--;
+ nextExpectedId++;
+ } else if (entryId > nextExpectedId) {
+ groupedReader.inputStream
+ .seek(groupedReader.index
+
.getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId)
+ .getDataOffset());
+ continue;
+ } else if (entryId < nextExpectedId
+ &&
!groupedReader.index.getIndexEntryForEntry(groupedReader.ledgerId,
nextExpectedId)
+ .equals(
+
groupedReader.index.getIndexEntryForEntry(groupedReader.ledgerId, entryId))) {
+ groupedReader.inputStream
+ .seek(groupedReader.index
+
.getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId)
+ .getDataOffset());
+ continue;
+ } else if (entryId > groupedReader.lastEntry) {
+ log.info("Expected to read {}, but read {}, which
is greater than last entry {}",
+ nextExpectedId, entryId,
groupedReader.lastEntry);
+ throw new
BKException.BKUnexpectedConditionException();
+ } else {
+ val skipped =
groupedReader.inputStream.skip(length);
+ log.info("Skipped {} bytes.", skipped);
Review comment:
Should this be `info` or `debug`?
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/StreamingOffloadIndexBlock.java
##########
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud;
+
+import java.io.Closeable;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+
+/**
+ * The Index block abstraction used for offload a ledger to long term storage.
+ */
+@Unstable
+public interface StreamingOffloadIndexBlock extends Closeable {
Review comment:
okay works for me.
##########
File path:
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/StreamingBlobStoreBackedReadHandleImpl.java
##########
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud.impl;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import lombok.val;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
+import org.apache.bookkeeper.mledger.offload.jcloud.StreamingOffloadIndexBlock;
+import
org.apache.bookkeeper.mledger.offload.jcloud.StreamingOffloadIndexBlockBuilder;
+import
org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.domain.Blob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamingBlobStoreBackedReadHandleImpl implements ReadHandle {
+ private static final Logger log =
LoggerFactory.getLogger(StreamingBlobStoreBackedReadHandleImpl.class);
+
+ private final long ledgerId;
+ private final List<StreamingOffloadIndexBlock> indices;
+ private final List<BackedInputStream> inputStreams;
+ private final List<DataInputStream> dataStreams;
+ private final ExecutorService executor;
+
+ static class GroupedReader {
+ long ledgerId;
+ long firstEntry;
+ long lastEntry;
+ StreamingOffloadIndexBlock index;
+ BackedInputStream inputStream;
+ DataInputStream dataStream;
+
+ public GroupedReader(long ledgerId, long firstEntry, long lastEntry,
+ StreamingOffloadIndexBlock index,
+ BackedInputStream inputStream, DataInputStream
dataStream) {
+ this.ledgerId = ledgerId;
+ this.firstEntry = firstEntry;
+ this.lastEntry = lastEntry;
+ this.index = index;
+ this.inputStream = inputStream;
+ this.dataStream = dataStream;
+ }
+ }
+
+ private StreamingBlobStoreBackedReadHandleImpl(long ledgerId,
List<StreamingOffloadIndexBlock> indices,
+ List<BackedInputStream>
inputStreams,
+ ExecutorService executor) {
+ this.ledgerId = ledgerId;
+ this.indices = indices;
+ this.inputStreams = inputStreams;
+ this.dataStreams = new LinkedList<>();
+ for (BackedInputStream inputStream : inputStreams) {
+ dataStreams.add(new DataInputStream(inputStream));
+ }
+ this.executor = executor;
+ }
+
+ @Override
+ public long getId() {
+ return ledgerId;
+ }
+
+ @Override
+ public LedgerMetadata getLedgerMetadata() {
+ return indices.get(0).getLedgerMetadata(ledgerId);
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ executor.submit(() -> {
+ try {
+ for (StreamingOffloadIndexBlock indexBlock : indices) {
+ indexBlock.close();
+ }
+ for (BackedInputStream inputStream : inputStreams) {
+ inputStream.close();
+ }
+ promise.complete(null);
+ } catch (IOException t) {
+ promise.completeExceptionally(t);
+ }
+ });
+ return promise;
+ }
+
+ @Override
+ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long
lastEntry) {
+ log.debug("Ledger {}: reading {} - {}", getId(), firstEntry,
lastEntry);
Review comment:
Please address all the occurrences.
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -22,19 +22,115 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-
+import lombok.ToString;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import
org.apache.bookkeeper.mledger.ManagedLedgerException.OffloadNotConsecutiveException;
+import
org.apache.bookkeeper.mledger.ManagedLedgerException.OffloadSegmentClosedException;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
/**
- * Interface for offloading ledgers to long-term storage
+ * Interface for offloading ledgers to long-term storage.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Evolving
public interface LedgerOffloader {
+ interface SegmentInfo {
+ boolean isClosed();
+
+ OffloadResult result();
+ }
+
+ @ToString
+ class SegmentInfoImpl implements SegmentInfo {
+ public SegmentInfoImpl(UUID uuid, long beginLedger, long beginEntry,
String driverName,
+ Map<String, String> driverMetadata) {
+ this.uuid = uuid;
+ this.beginLedger = beginLedger;
+ this.beginEntry = beginEntry;
+ this.driverName = driverName;
+ this.driverMetadata = driverMetadata;
+ }
+
+
+ public final UUID uuid;
+ public final long beginLedger;
+ public final long beginEntry;
+ public final String driverName;
+ volatile private long endLedger;
+ volatile private long endEntry;
+ volatile boolean closed = false;
+ public final Map<String, String> driverMetadata;
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public void closeSegment(long endLedger, long endEntry) {
+ this.endLedger = endLedger;
+ this.endEntry = endEntry;
+ this.closed = true;
+ }
+
+ public OffloadResult result() {
+ return new OffloadResult(beginLedger, beginEntry, endLedger,
endEntry);
+ }
+ }
+
+
+ class OffloadResult {
+ public final long beginLedger;
+ public final long beginEntry;
+ public final long endLedger;
+ public final long endEntry;
+
+ public OffloadResult(long beginLedger, long beginEntry, long
endLedger, long endEntry) {
+ this.beginLedger = beginLedger;
+ this.beginEntry = beginEntry;
+ this.endLedger = endLedger;
+ this.endEntry = endEntry;
+ }
+ }
+
+ /**
+ * Used to store driver info, buffer entries, mark progress, etc.
+ * Create one per second.
+ */
+ interface OffloadHandle {
Review comment:
Don't we need a `close` method to seal the offloaded segment?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]