wuchong commented on code in PR #2037:
URL: https://github.com/apache/fluss/pull/2037#discussion_r2625452892


##########
fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java:
##########
@@ -681,6 +683,45 @@ public static FsPath remoteKvSnapshotDir(FsPath 
remoteKvTabletDir, long snapshot
         return new FsPath(remoteKvTabletDir, REMOTE_KV_SNAPSHOT_DIR_PREFIX + 
snapshotId);
     }
 
+    /**
+     * Returns the remote path for storing lake snapshot metadata required by 
Fluss for a table.
+     *
+     * <p>The path contract:
+     *
+     * <pre>
+     * {$remote.data.dir}/lake/{databaseName}/{tableName}-{tableId}
+     * </pre>
+     */
+    public static FsPath remoteLakeTableSnapshotMetadataDir(
+            String remoteDataDir, TablePath tablePath, long tableId) {
+        return new FsPath(
+                String.format(
+                        "%s/%s/%s/%s-%d",
+                        remoteDataDir,
+                        REMOTE_LAKE_DIR_NAME,
+                        tablePath.getDatabaseName(),
+                        tablePath.getTableName(),
+                        tableId));
+    }
+
+    /**
+     * Returns a remote path for storing lake snapshot metadata required by 
Fluss for a table.
+     *
+     * <p>The path contract:
+     *
+     * <pre>
+     * {$remoteLakeTableSnapshotMetadataDir}/manifest/{uuid}.manifest
+     * </pre>
+     */
+    public static FsPath remoteLakeTableSnapshotManifestPath(
+            String remoteDataDir, TablePath tablePath, long tableId) {
+        return new FsPath(
+                String.format(
+                        "%s/manifest/%s.manifest",

Review Comment:
   ```suggestion
                           "%s/metadata/%s.manifest",
   ```
   
   To keep align with the log manifest path.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java:
##########
@@ -54,27 +47,20 @@ public long lakeSnapshotId() {
         return lakeSnapshotId;
     }
 
-    public Set<Tuple2<TableBucket, String>> tablePartitionBuckets() {
+    public Set<TableBucket> tableBuckets() {
         return logEndOffsets.keySet();
     }
 
-    public void addBucketOffsetAndTimestamp(TableBucket bucket, long offset, 
long timestamp) {
-        logEndOffsets.put(Tuple2.of(bucket, null), offset);
-        maxTimestamps.put(Tuple2.of(bucket, null), timestamp);
-    }
-
-    public void addPartitionBucketOffsetAndTimestamp(
-            TableBucket bucket, String partitionName, long offset, long 
timestamp) {
-        logEndOffsets.put(Tuple2.of(bucket, partitionName), offset);
-        maxTimestamps.put(Tuple2.of(bucket, partitionName), timestamp);
+    public void addBucketOffset(TableBucket bucket, long offset) {
+        logEndOffsets.put(bucket, offset);
     }
 
-    public long getLogEndOffset(Tuple2<TableBucket, String> bucketPartition) {
-        return logEndOffsets.get(bucketPartition);
+    public void addPartitionBucketOffset(TableBucket bucket, long offset) {

Review Comment:
   This method looks the same as `addBucketOffset`? We can remove this method?



##########
fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java:
##########
@@ -681,6 +683,45 @@ public static FsPath remoteKvSnapshotDir(FsPath 
remoteKvTabletDir, long snapshot
         return new FsPath(remoteKvTabletDir, REMOTE_KV_SNAPSHOT_DIR_PREFIX + 
snapshotId);
     }
 
+    /**
+     * Returns the remote path for storing lake snapshot metadata required by 
Fluss for a table.
+     *
+     * <p>The path contract:
+     *
+     * <pre>
+     * {$remote.data.dir}/lake/{databaseName}/{tableName}-{tableId}
+     * </pre>
+     */
+    public static FsPath remoteLakeTableSnapshotMetadataDir(

Review Comment:
   ```suggestion
       public static FsPath remoteLakeTableSnapshotDir(
   ```
   
   It doesn't include `metadata`/`manifest` in the path. 



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:
##########
@@ -1140,6 +1147,33 @@ private void 
processNotifyKvSnapshotOffsetEvent(NotifyKvSnapshotOffsetEvent even
                 coordinatorContext.getCoordinatorEpoch());
     }
 
+    private void processNotifyLakeTableOffsetEvent(NotifyLakeTableOffsetEvent 
event) {
+        Map<Long, LakeTableSnapshot> lakeTableSnapshots = 
event.getLakeTableSnapshots();
+        Map<TableBucket, Long> tableBucketMaxTieredTimestamps =
+                event.getTableBucketMaxTieredTimestamps();
+        coordinatorRequestBatch.newBatch();
+        for (Map.Entry<Long, LakeTableSnapshot> lakeTableSnapshotEntry :
+                lakeTableSnapshots.entrySet()) {
+            LakeTableSnapshot lakeTableSnapshot = 
lakeTableSnapshotEntry.getValue();
+            for (Map.Entry<TableBucket, Long> bucketLogEndOffsetEntry :
+                    lakeTableSnapshot.getBucketLogEndOffset().entrySet()) {
+                TableBucket tb = bucketLogEndOffsetEntry.getKey();

Review Comment:
   Can simplify into 
   
   ```java
   for (TableBucket tb : lakeTableSnapshot.getBucketLogEndOffset().keySet()) {
                   coordinatorContext
                           .getBucketLeaderAndIsr(tb)
    ...
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java:
##########
@@ -407,6 +399,11 @@ CompletableFuture<Void> stopServices() {
                 exception = ExceptionUtils.firstOrSuppressed(t, exception);
             }
 
+            if (ioExecutor != null) {
+                // shutdown io executor
+                ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, 
ioExecutor);

Review Comment:
   We should keep the original try-catch wrapper, and handle potential 
exceptions?



##########
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java:
##########
@@ -394,6 +394,14 @@ public class ConfigOptions {
                             "The rack for the tabletServer. This will be used 
in rack aware bucket assignment "
                                     + "for fault tolerance. Examples: `RACK1`, 
`cn-hangzhou-server10`");
 
+    public static final ConfigOption<Integer> TABLET_SERVER_IO_POOL_SIZE =
+            key("tablet-server.io-pool.size")

Review Comment:
   Can we unify the io-pool for coordinator and tablet servrer? We can rename 
this config to `server.io-pool.size` in this PR. And create follow-up issues:
   1. deprecate `kv.snapshot.transfer-thread-num` and 
`remote.log.data-transfer-thread-num`, and use `server.io-pool.size` instead, 
may turn it into 10 threads by default. 
   2. deprecate `coordinator.io-pool.size` and use `server.io-pool.size` 
instead. 



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java:
##########
@@ -692,22 +679,20 @@ private static CommitLakeTableSnapshotRequest 
genCommitLakeTableSnapshotRequest(
             long tableId,
             @Nullable Long partitionId,
             long snapshotId,
-            Map<Integer, Long> bucketLogStartOffsets,
             Map<Integer, Long> bucketLogEndOffsets) {
         CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
                 new CommitLakeTableSnapshotRequest();
         PbLakeTableSnapshotInfo reqForTable = 
commitLakeTableSnapshotRequest.addTablesReq();
         reqForTable.setTableId(tableId);
         reqForTable.setSnapshotId(snapshotId);
-        for (Map.Entry<Integer, Long> bucketLogStartOffset : 
bucketLogStartOffsets.entrySet()) {
-            int bucketId = bucketLogStartOffset.getKey();
+        for (Map.Entry<Integer, Long> bucketLogEndOffset : 
bucketLogEndOffsets.entrySet()) {
+            int bucketId = bucketLogEndOffset.getKey();
             TableBucket tb = new TableBucket(tableId, partitionId, bucketId);
             PbLakeTableOffsetForBucket lakeTableOffsetForBucket = 
reqForTable.addBucketsReq();
             if (tb.getPartitionId() != null) {
                 lakeTableOffsetForBucket.setPartitionId(tb.getPartitionId());
             }
             lakeTableOffsetForBucket.setBucketId(tb.getBucket());
-            
lakeTableOffsetForBucket.setLogStartOffset(bucketLogStartOffset.getValue());
             
lakeTableOffsetForBucket.setLogEndOffset(bucketLogEndOffsets.get(bucketId));

Review Comment:
   `bucketLogEndOffset.getValue()` instead of 
`bucketLogEndOffsets.get(bucketId)`?



##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -462,9 +462,7 @@ message PbLakeTableSnapshotInfo {
 message PbLakeTableOffsetForBucket {
   optional int64 partition_id = 1;
   required int32 bucket_id = 2;
-  optional int64 log_start_offset = 3;
   optional int64 log_end_offset = 4;
-  optional string partition_name = 5;

Review Comment:
   Let's comment them instead of removing, so we know what are the legacy 
deprecated fields. 



##########
fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java:
##########
@@ -820,18 +820,9 @@ private void updateWithLakeTableSnapshot(Replica replica) 
throws Exception {
             LakeTableSnapshot lakeTableSnapshot = optLakeTableSnapshot.get();
             long snapshotId = optLakeTableSnapshot.get().getSnapshotId();
             replica.getLogTablet().updateLakeTableSnapshotId(snapshotId);
-
-            lakeTableSnapshot
-                    .getLogStartOffset(tb)
-                    
.ifPresent(replica.getLogTablet()::updateLakeLogStartOffset);
-

Review Comment:
   Currently, this method performs heavy I/O operations (e.g., reading from 
OSS/S3) while holding a lock, which can lead to potential deadlocks or 
unresponsive behavior.  
   
   From my review, this method is only used to update the lake log offset, 
specifically for the `fetchFromLake` feature. However, since `fetchFromLake` is 
planned for removal, this method will become obsolete and can be removed 
alongside it.  
   
   If my understanding is correct, could you please add a `TODO` comment in the 
`updateWithLakeTableSnapshot` method indicating that it can be removed when 
`fetchFromLake` is deprecated?



##########
fluss-server/src/test/java/org/apache/fluss/server/zk/data/LakeTableSnapshotJsonSerdeTest.java:
##########
@@ -33,75 +41,103 @@ class LakeTableSnapshotJsonSerdeTest extends 
JsonSerdeTestBase<LakeTableSnapshot
 
     @Override
     protected LakeTableSnapshot[] createObjects() {
-        LakeTableSnapshot lakeTableSnapshot1 =
-                new LakeTableSnapshot(
-                        1,
-                        1L,
-                        Collections.emptyMap(),
-                        Collections.emptyMap(),
-                        Collections.emptyMap(),
-                        Collections.emptyMap());
+        // Test case 1: Empty snapshot
+        LakeTableSnapshot lakeTableSnapshot1 = new LakeTableSnapshot(1L, 
Collections.emptyMap());
 
+        // Test case 2: Non-partition table with consecutive bucket ids (0, 1, 
2)
         long tableId = 4;
-        Map<TableBucket, Long> bucketLogStartOffset = new HashMap<>();
-        bucketLogStartOffset.put(new TableBucket(tableId, 1), 1L);
-        bucketLogStartOffset.put(new TableBucket(tableId, 2), 2L);
         Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
-        bucketLogEndOffset.put(new TableBucket(tableId, 1), 3L);
-        bucketLogEndOffset.put(new TableBucket(tableId, 2), 4L);
-        Map<TableBucket, Long> bucketMaxTimestamp = new HashMap<>();
-        bucketMaxTimestamp.put(new TableBucket(tableId, 1), 5L);
-        bucketMaxTimestamp.put(new TableBucket(tableId, 2), 6L);
-
-        LakeTableSnapshot lakeTableSnapshot2 =
-                new LakeTableSnapshot(
-                        2,
-                        tableId,
-                        bucketLogStartOffset,
-                        bucketLogEndOffset,
-                        bucketMaxTimestamp,
-                        Collections.emptyMap());
+        bucketLogEndOffset.put(new TableBucket(tableId, 0), 100L);
+        bucketLogEndOffset.put(new TableBucket(tableId, 1), 200L);
+        bucketLogEndOffset.put(new TableBucket(tableId, 2), 300L);
+        LakeTableSnapshot lakeTableSnapshot2 = new LakeTableSnapshot(2, 
bucketLogEndOffset);
 
+        // Test case 3: Non-partition table with missing bucket ids (0, 2, 4 - 
missing 1 and 3)
         tableId = 5;
-        bucketLogStartOffset = new HashMap<>();
-        Map<Long, String> partitionNameIdByPartitionId = new HashMap<>();
-        partitionNameIdByPartitionId.put(1L, "partition1");
-        partitionNameIdByPartitionId.put(2L, "partition2");
-        bucketLogStartOffset.put(new TableBucket(tableId, 1L, 1), 1L);
-        bucketLogStartOffset.put(new TableBucket(tableId, 2L, 1), 2L);
+        bucketLogEndOffset = new HashMap<>();
+        bucketLogEndOffset.put(new TableBucket(tableId, 0), 100L);
+        bucketLogEndOffset.put(new TableBucket(tableId, 2), 300L);
+        bucketLogEndOffset.put(new TableBucket(tableId, 4), 500L);
+        LakeTableSnapshot lakeTableSnapshot3 = new LakeTableSnapshot(3, 
bucketLogEndOffset);
+
+        // Test case 4: Partition table with consecutive bucket ids
+        tableId = 6;
+        bucketLogEndOffset = new HashMap<>();
+        bucketLogEndOffset.put(new TableBucket(tableId, 1L, 0), 100L);
+        bucketLogEndOffset.put(new TableBucket(tableId, 1L, 1), 200L);
+        bucketLogEndOffset.put(new TableBucket(tableId, 2L, 0), 300L);
+        bucketLogEndOffset.put(new TableBucket(tableId, 2L, 1), 400L);
+        LakeTableSnapshot lakeTableSnapshot4 = new LakeTableSnapshot(4, 
bucketLogEndOffset);
 
+        // Test case 5: Partition table with missing bucket ids
+        tableId = 7;
         bucketLogEndOffset = new HashMap<>();
-        bucketLogEndOffset.put(new TableBucket(tableId, 1L, 1), 3L);
-        bucketLogEndOffset.put(new TableBucket(tableId, 2L, 1), 4L);
-
-        bucketMaxTimestamp = new HashMap<>();
-        bucketMaxTimestamp.put(new TableBucket(tableId, 1L, 1), 5L);
-        bucketMaxTimestamp.put(new TableBucket(tableId, 2L, 1), 6L);
-
-        LakeTableSnapshot lakeTableSnapshot3 =
-                new LakeTableSnapshot(
-                        3,
-                        tableId,
-                        bucketLogStartOffset,
-                        bucketLogEndOffset,
-                        bucketMaxTimestamp,
-                        partitionNameIdByPartitionId);
+        bucketLogEndOffset.put(new TableBucket(tableId, 1L, 0), 100L);
+        bucketLogEndOffset.put(new TableBucket(tableId, 1L, 2), 300L); // 
missing bucket 1
+        bucketLogEndOffset.put(new TableBucket(tableId, 2L, 1), 400L);
+        bucketLogEndOffset.put(new TableBucket(tableId, 2L, 3), 600L); // 
missing bucket 0 and 2
+        LakeTableSnapshot lakeTableSnapshot5 = new LakeTableSnapshot(5, 
bucketLogEndOffset);
 
         return new LakeTableSnapshot[] {
-            lakeTableSnapshot1, lakeTableSnapshot2, lakeTableSnapshot3,
+            lakeTableSnapshot1,
+            lakeTableSnapshot2,
+            lakeTableSnapshot3,
+            lakeTableSnapshot4,
+            lakeTableSnapshot5,
         };
     }
 
     @Override
     protected String[] expectedJsons() {
+        // Version 2 format (compact layout):
+        // - Non-partition table: "buckets": [100, 200, 300], array index = 
bucket id, value =
+        //   log_end_offset. Missing buckets are filled with -1.
+        // - Partition table: "buckets": {"1": [100, 200], "2": [300, 400]}, 
key = partition id,
+        //   array index = bucket id, value = log_end_offset. Missing buckets 
are filled with -1.
         return new String[] {
-            "{\"version\":1,\"snapshot_id\":1,\"table_id\":1,\"buckets\":[]}",
-            "{\"version\":1,\"snapshot_id\":2,\"table_id\":4,"
-                    + 
"\"buckets\":[{\"bucket_id\":2,\"log_start_offset\":2,\"log_end_offset\":4,\"max_timestamp\":6},"
-                    + 
"{\"bucket_id\":1,\"log_start_offset\":1,\"log_end_offset\":3,\"max_timestamp\":5}]}",
-            "{\"version\":1,\"snapshot_id\":3,\"table_id\":5,"
-                    + 
"\"buckets\":[{\"partition_id\":1,\"partition_name\":\"partition1\",\"bucket_id\":1,\"log_start_offset\":1,\"log_end_offset\":3,\"max_timestamp\":5},"
-                    + 
"{\"partition_id\":2,\"partition_name\":\"partition2\",\"bucket_id\":1,\"log_start_offset\":2,\"log_end_offset\":4,\"max_timestamp\":6}]}"
+            // Test case 1: Empty snapshot
+            "{\"version\":2,\"snapshot_id\":1}",
+            // Test case 2: Non-partition table with consecutive bucket ids 
[0, 1, 2]
+            
"{\"version\":2,\"snapshot_id\":2,\"table_id\":4,\"buckets\":[100,200,300]}",
+            // Test case 3: Non-partition table with missing bucket ids [0, 
-1, 2, -1, 4]
+            
"{\"version\":2,\"snapshot_id\":3,\"table_id\":5,\"buckets\":[100,-1,300,-1,500]}",

Review Comment:
   It's complex to deserialize or understand the bucket offset if we use the 
same property key for partitioned and non-partitioned table. Why not use 
different property keys for them, like `bucket_offsets`, and 
`partition_bucket_offsets`?



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/RemoteStorageCleaner.java:
##########
@@ -57,10 +61,16 @@ public RemoteStorageCleaner(Configuration configuration, 
ExecutorService ioExecu
         }
     }
 
-    public void asyncDeleteTableRemoteDir(TablePath tablePath, boolean 
isKvTable, long tableId) {
+    public void asyncDeleteTableRemoteDir(
+            TablePath tablePath, boolean isKvTable, boolean isLakeEnabled, 
long tableId) {
         if (isKvTable) {
             asyncDeleteDir(FlussPaths.remoteTableDir(remoteKvDir, tablePath, 
tableId));
         }
+        if (isLakeEnabled) {

Review Comment:
   If a table was enabled datalake, but turned off later, and then the table 
was deleted. We may leave the lake snapshot metadata file here? Maybe we can 
simply do the delete for the metadata file, no matter it is datalake enabled or 
not? (add comment for this if we do such change)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to