codelipenghui commented on a change in pull request #9096:
URL: https://github.com/apache/pulsar/pull/9096#discussion_r554527391



##########
File path: managed-ledger/src/main/proto/MLDataFormats.proto
##########
@@ -28,28 +28,40 @@ message KeyValue {
 
 message OffloadDriverMetadata {
     required string name = 1;
-    repeated KeyValue properties = 2;
+  repeated KeyValue properties = 2;
 }
 
 message OffloadContext {
-    optional int64 uidMsb = 1;
-    optional int64 uidLsb = 2;
-    optional bool complete = 3;
-    optional bool bookkeeperDeleted = 4;
-    optional int64 timestamp = 5;
-    optional OffloadDriverMetadata driverMetadata = 6;
+  optional int64 uidMsb = 1;
+  optional int64 uidLsb = 2;
+  optional bool complete = 3;
+  optional bool bookkeeperDeleted = 4;
+  optional int64 timestamp = 5;
+  optional OffloadDriverMetadata driverMetadata = 6;
+  repeated OffloadSegment offloadSegment = 7;
+}
+
+message OffloadSegment {
+  optional int64 uidMsb = 1;
+  optional int64 uidLsb = 2;
+  optional bool complete = 3;
+  optional bool bookkeeperDeleted = 4;

Review comment:
       Do we need this field in the OffloadSegment? A ledger might be offloaded 
as multiple segments,  it's better to use the `bookkeeperDeleted` in the 
`OffloadContext`

##########
File path: managed-ledger/src/main/proto/MLDataFormats.proto
##########
@@ -28,28 +28,40 @@ message KeyValue {
 
 message OffloadDriverMetadata {
     required string name = 1;
-    repeated KeyValue properties = 2;
+  repeated KeyValue properties = 2;
 }
 
 message OffloadContext {
-    optional int64 uidMsb = 1;
-    optional int64 uidLsb = 2;
-    optional bool complete = 3;
-    optional bool bookkeeperDeleted = 4;
-    optional int64 timestamp = 5;
-    optional OffloadDriverMetadata driverMetadata = 6;
+  optional int64 uidMsb = 1;
+  optional int64 uidLsb = 2;
+  optional bool complete = 3;
+  optional bool bookkeeperDeleted = 4;
+  optional int64 timestamp = 5;
+  optional OffloadDriverMetadata driverMetadata = 6;
+  repeated OffloadSegment offloadSegment = 7;
+}
+
+message OffloadSegment {
+  optional int64 uidMsb = 1;
+  optional int64 uidLsb = 2;
+  optional bool complete = 3;
+  optional bool bookkeeperDeleted = 4;
+  optional int64 assignedTs = 5; //epoch millis

Review comment:
       ```suggestion
     optional int64 assignedTimestamp = 5; //epoch millis
   ```

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -35,6 +39,78 @@
 @InterfaceStability.Evolving
 public interface LedgerOffloader {
 
+    @ToString
+    class SegmentInfo {

Review comment:
       Do we need to expose the SegmentInfo here?

##########
File path: managed-ledger/src/main/proto/MLDataFormats.proto
##########
@@ -28,28 +28,40 @@ message KeyValue {
 
 message OffloadDriverMetadata {
     required string name = 1;
-    repeated KeyValue properties = 2;
+  repeated KeyValue properties = 2;

Review comment:
       Could you please check the format changes related to the .proto? It's 
better to keep consistent with the current format

##########
File path: managed-ledger/src/main/proto/MLDataFormats.proto
##########
@@ -28,28 +28,40 @@ message KeyValue {
 
 message OffloadDriverMetadata {
     required string name = 1;
-    repeated KeyValue properties = 2;
+  repeated KeyValue properties = 2;
 }
 
 message OffloadContext {
-    optional int64 uidMsb = 1;
-    optional int64 uidLsb = 2;
-    optional bool complete = 3;
-    optional bool bookkeeperDeleted = 4;
-    optional int64 timestamp = 5;
-    optional OffloadDriverMetadata driverMetadata = 6;
+  optional int64 uidMsb = 1;
+  optional int64 uidLsb = 2;
+  optional bool complete = 3;
+  optional bool bookkeeperDeleted = 4;
+  optional int64 timestamp = 5;
+  optional OffloadDriverMetadata driverMetadata = 6;
+  repeated OffloadSegment offloadSegment = 7;
+}
+
+message OffloadSegment {
+  optional int64 uidMsb = 1;
+  optional int64 uidLsb = 2;
+  optional bool complete = 3;
+  optional bool bookkeeperDeleted = 4;
+  optional int64 assignedTs = 5; //epoch millis
+  optional int64 offloadedTs = 6; //epoch millis

Review comment:
       ```suggestion
     optional int64 offloadedTimestamp = 6; //epoch millis
   ```

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -35,6 +39,78 @@
 @InterfaceStability.Evolving
 public interface LedgerOffloader {
 
+    @ToString
+    class SegmentInfo {
+        public SegmentInfo(UUID uuid, long beginLedger, long beginEntry, 
String driverName,
+                           Map<String, String> driverMetadata) {
+            this.uuid = uuid;
+            this.beginLedger = beginLedger;
+            this.beginEntry = beginEntry;
+            this.driverName = driverName;
+            this.driverMetadata = driverMetadata;
+        }
+
+
+        public final UUID uuid;
+        public final long beginLedger;
+        public final long beginEntry;
+        public final String driverName;
+        volatile private long endLedger;
+        volatile private long endEntry;
+        volatile boolean closed = false;
+        public final Map<String, String> driverMetadata;
+
+        public boolean isClosed() {
+            return closed;
+        }
+
+        public void closeSegment(long endLedger, long endEntry) {
+            this.endLedger = endLedger;
+            this.endEntry = endEntry;
+            this.closed = true;
+        }
+
+        public OffloadResult result() {
+            return new OffloadResult(beginLedger, beginEntry, endLedger, 
endEntry);
+        }
+    }
+
+
+    class OffloadResult {
+        public final long beginLedger;
+        public final long beginEntry;
+        public final long endLedger;
+        public final long endEntry;
+
+        public OffloadResult(long beginLedger, long beginEntry, long 
endLedger, long endEntry) {
+            this.beginLedger = beginLedger;
+            this.beginEntry = beginEntry;
+            this.endLedger = endLedger;
+            this.endEntry = endEntry;
+        }
+    }
+
+    /**
+     * Used to store driver info, buffer entries, mark progress, etc.
+     * Create one per second.
+     */
+    interface OffloaderHandle {

Review comment:
       ```suggestion
       interface OffloadHandle {
   ```

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -85,6 +161,32 @@
                                     UUID uid,
                                     Map<String, String> extraMetadata);
 
+    /**
+     * Offload the passed in ledger to longterm storage.
+     * Metadata passed in is for inspection purposes only and should be stored
+     * alongside the segment data.
+     *
+     * When the returned future completes, the ledger has been persisted to the
+     * loadterm storage, so it is safe to delete the original copy in 
bookkeeper.

Review comment:
       Please check the comment of this method, I think here is not correct, 
The result just return a future for the OffloadHandle, this does not mean that 
the data has been persisted

##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/StreamingOffloadIndexBlock.java
##########
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud;
+
+import java.io.Closeable;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+
+/**
+ * The Index block abstraction used for offload a ledger to long term storage.
+ */
+@Unstable
+public interface StreamingOffloadIndexBlock extends Closeable {

Review comment:
       Can we reuse the `OffloadIndexBlock`? In my opinion, we just add some 
fields at the index header. Shall we need to introduce a new interface here? I 
think this will introduce more duplicate code

##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
##########
@@ -63,6 +63,10 @@
     public static final String METADATA_FIELD_MAX_BLOCK_SIZE = 
"maxBlockSizeInBytes";
     public static final String METADATA_FIELD_READ_BUFFER_SIZE = 
"readBufferSizeInBytes";
     public static final String OFFLOADER_PROPERTY_PREFIX = 
"managedLedgerOffload";
+    public static final String MAX_SEGMENT_TIME_IN_SECOND = 
"maxSegmentTimeInSecond";
+    public static final long DEFAULT_MAX_SEGMENT_TIME_IN_SECOND = 600;
+    public static final String MAX_SEGMENT_SIZE_IN_BYTES = 
"maxSegmentSizeInBytes";
+    public static final long DEFAULT_MAX_SEGMENT_SIZE_IN_BYTES = 1024 * 1024 * 
1024;

Review comment:
       We also need to add the minOffloadSegmentRolloverTimeInSeconds to avoid 
the segments rollover too often.

##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/StreamingOffloadIndexBlockBuilder.java
##########
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload.jcloud;
+
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import 
org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+import 
org.apache.bookkeeper.mledger.offload.jcloud.impl.StreamingOffloadIndexBlockBuilderImpl;
+
+/**
+ * Interface for builder of index block used for offload a ledger to long term 
storage.
+ */
+@Unstable
+@LimitedPrivate
+public interface StreamingOffloadIndexBlockBuilder {

Review comment:
       Please consider reuse the `OffloadIndexBlockBuilder`

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -35,6 +39,78 @@
 @InterfaceStability.Evolving
 public interface LedgerOffloader {
 
+    @ToString
+    class SegmentInfo {
+        public SegmentInfo(UUID uuid, long beginLedger, long beginEntry, 
String driverName,
+                           Map<String, String> driverMetadata) {
+            this.uuid = uuid;
+            this.beginLedger = beginLedger;
+            this.beginEntry = beginEntry;
+            this.driverName = driverName;
+            this.driverMetadata = driverMetadata;
+        }
+
+
+        public final UUID uuid;
+        public final long beginLedger;
+        public final long beginEntry;
+        public final String driverName;
+        volatile private long endLedger;
+        volatile private long endEntry;
+        volatile boolean closed = false;
+        public final Map<String, String> driverMetadata;
+
+        public boolean isClosed() {
+            return closed;
+        }
+
+        public void closeSegment(long endLedger, long endEntry) {
+            this.endLedger = endLedger;
+            this.endEntry = endEntry;
+            this.closed = true;
+        }
+
+        public OffloadResult result() {
+            return new OffloadResult(beginLedger, beginEntry, endLedger, 
endEntry);
+        }
+    }
+
+
+    class OffloadResult {
+        public final long beginLedger;
+        public final long beginEntry;
+        public final long endLedger;
+        public final long endEntry;
+
+        public OffloadResult(long beginLedger, long beginEntry, long 
endLedger, long endEntry) {
+            this.beginLedger = beginLedger;
+            this.beginEntry = beginEntry;
+            this.endLedger = endLedger;
+            this.endEntry = endEntry;
+        }
+    }
+
+    /**
+     * Used to store driver info, buffer entries, mark progress, etc.
+     * Create one per second.
+     */
+    interface OffloaderHandle {
+
+        /**
+         * return true when both buffer have enough size and ledger/entry id 
is next to the current one.
+         * @param size
+         * @return
+         */
+        boolean canOffer(long size);
+
+        PositionImpl lastOffered();
+
+        boolean offerEntry(EntryImpl entry) throws 
OffloadSegmentClosedException,
+                ManagedLedgerException.OffloadNotConsecutiveException;

Review comment:
       Is it possible to check the consecutive in the offloader? I think it's 
more easy to handle in the managedledger.

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -35,6 +39,78 @@
 @InterfaceStability.Evolving
 public interface LedgerOffloader {
 
+    @ToString
+    class SegmentInfo {
+        public SegmentInfo(UUID uuid, long beginLedger, long beginEntry, 
String driverName,
+                           Map<String, String> driverMetadata) {
+            this.uuid = uuid;
+            this.beginLedger = beginLedger;
+            this.beginEntry = beginEntry;
+            this.driverName = driverName;
+            this.driverMetadata = driverMetadata;
+        }
+
+
+        public final UUID uuid;
+        public final long beginLedger;
+        public final long beginEntry;
+        public final String driverName;
+        volatile private long endLedger;
+        volatile private long endEntry;
+        volatile boolean closed = false;
+        public final Map<String, String> driverMetadata;
+
+        public boolean isClosed() {
+            return closed;
+        }
+
+        public void closeSegment(long endLedger, long endEntry) {
+            this.endLedger = endLedger;
+            this.endEntry = endEntry;
+            this.closed = true;
+        }
+
+        public OffloadResult result() {
+            return new OffloadResult(beginLedger, beginEntry, endLedger, 
endEntry);
+        }
+    }
+
+
+    class OffloadResult {
+        public final long beginLedger;
+        public final long beginEntry;
+        public final long endLedger;
+        public final long endEntry;
+
+        public OffloadResult(long beginLedger, long beginEntry, long 
endLedger, long endEntry) {
+            this.beginLedger = beginLedger;
+            this.beginEntry = beginEntry;
+            this.endLedger = endLedger;
+            this.endEntry = endEntry;
+        }
+    }
+
+    /**
+     * Used to store driver info, buffer entries, mark progress, etc.
+     * Create one per second.
+     */
+    interface OffloaderHandle {
+
+        /**
+         * return true when both buffer have enough size and ledger/entry id 
is next to the current one.
+         * @param size
+         * @return
+         */
+        boolean canOffer(long size);
+
+        PositionImpl lastOffered();
+
+        boolean offerEntry(EntryImpl entry) throws 
OffloadSegmentClosedException,

Review comment:
       It's better to also add async method for these 3 methods. Usually, we 
should implement the sync method based on the async method.

##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
##########
@@ -63,6 +63,10 @@
     public static final String METADATA_FIELD_MAX_BLOCK_SIZE = 
"maxBlockSizeInBytes";
     public static final String METADATA_FIELD_READ_BUFFER_SIZE = 
"readBufferSizeInBytes";
     public static final String OFFLOADER_PROPERTY_PREFIX = 
"managedLedgerOffload";
+    public static final String MAX_SEGMENT_TIME_IN_SECOND = 
"maxSegmentTimeInSecond";

Review comment:
       maxOffloadSegmentRolloverTimeInSeconds?

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
##########
@@ -115,6 +217,16 @@
     CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
                                             Map<String, String> 
offloadDriverMetadata);
 
+    default CompletableFuture<ReadHandle> readOffloaded(long ledgerId, 
MLDataFormats.OffloadContext ledgerContext,
+                                                        Map<String, String> 
offloadDriverMetadata) {
+        throw new UnsupportedClassVersionError();

Review comment:
       UnsupportedOperationException?

##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/OffloadIndexEntry.java
##########
@@ -33,7 +33,7 @@
     /**
      * Get the entryId that this entry contains.
      */
-    long getEntryId();
+    long getFirstEntryId();

Review comment:
       Why change it to getFirstEntryId? The index always point to one entryId 
right?

##########
File path: managed-ledger/src/main/proto/MLDataFormats.proto
##########
@@ -28,28 +28,40 @@ message KeyValue {
 
 message OffloadDriverMetadata {
     required string name = 1;
-    repeated KeyValue properties = 2;
+  repeated KeyValue properties = 2;
 }
 
 message OffloadContext {
-    optional int64 uidMsb = 1;
-    optional int64 uidLsb = 2;
-    optional bool complete = 3;
-    optional bool bookkeeperDeleted = 4;
-    optional int64 timestamp = 5;
-    optional OffloadDriverMetadata driverMetadata = 6;
+  optional int64 uidMsb = 1;
+  optional int64 uidLsb = 2;
+  optional bool complete = 3;
+  optional bool bookkeeperDeleted = 4;
+  optional int64 timestamp = 5;
+  optional OffloadDriverMetadata driverMetadata = 6;
+  repeated OffloadSegment offloadSegment = 7;
+}
+
+message OffloadSegment {
+  optional int64 uidMsb = 1;
+  optional int64 uidLsb = 2;
+  optional bool complete = 3;
+  optional bool bookkeeperDeleted = 4;
+  optional int64 assignedTs = 5; //epoch millis

Review comment:
       Please give a meaningful description? `epoch millis` is confuse here.

##########
File path: managed-ledger/src/main/proto/MLDataFormats.proto
##########
@@ -28,28 +28,40 @@ message KeyValue {
 
 message OffloadDriverMetadata {
     required string name = 1;
-    repeated KeyValue properties = 2;
+  repeated KeyValue properties = 2;
 }
 
 message OffloadContext {
-    optional int64 uidMsb = 1;
-    optional int64 uidLsb = 2;
-    optional bool complete = 3;
-    optional bool bookkeeperDeleted = 4;
-    optional int64 timestamp = 5;
-    optional OffloadDriverMetadata driverMetadata = 6;
+  optional int64 uidMsb = 1;
+  optional int64 uidLsb = 2;
+  optional bool complete = 3;
+  optional bool bookkeeperDeleted = 4;
+  optional int64 timestamp = 5;
+  optional OffloadDriverMetadata driverMetadata = 6;
+  repeated OffloadSegment offloadSegment = 7;
+}
+
+message OffloadSegment {
+  optional int64 uidMsb = 1;
+  optional int64 uidLsb = 2;
+  optional bool complete = 3;
+  optional bool bookkeeperDeleted = 4;
+  optional int64 assignedTs = 5; //epoch millis
+  optional int64 offloadedTs = 6; //epoch millis

Review comment:
       Please give a meaningful description? epoch millis is confuse here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to