wuchong commented on code in PR #2223:
URL: https://github.com/apache/fluss/pull/2223#discussion_r2650587639


##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:
##########
@@ -1262,20 +1275,58 @@ private void tryProcessCommitLakeTableSnapshot(
                                 }
 
                                 // this involves IO operation (ZK), so we do 
it in ioExecutor
-                                lakeTableHelper.upsertLakeTable(
-                                        tableId, tablePath, 
lakeTableSnapshotEntry.getValue());
+                                lakeTableHelper.registerLakeTableSnapshotV1(
+                                        tableId, 
lakeTableSnapshotEntry.getValue());
+                                // send notify lakehouse data request to all 
replicas via
+                                // coordinator event
+                                coordinatorEventManager.put(
+                                        new NotifyLakeTableOffsetEvent(
+                                                lakeTableSnapshots,
+                                                commitLakeTableSnapshotData
+                                                        
.getTableBucketsMaxTieredTimestamp()));
                             } catch (Exception e) {
                                 ApiError error = ApiError.fromThrowable(e);
                                 tableResp.setError(error.error().code(), 
error.message());
                             }
                         }
+                        callback.complete(response);
+                    } catch (Exception e) {
+                        callback.completeExceptionally(e);
+                    }
+                });
+    }
 
-                        // send notify lakehouse data request to all replicas 
via coordinator event
-                        coordinatorEventManager.put(
-                                new NotifyLakeTableOffsetEvent(
-                                        lakeTableSnapshots,
-                                        commitLakeTableSnapshotData
-                                                
.getTableBucketsMaxTieredTimestamp()));
+    private void handleCommitLakeTableSnapshotV2(
+            CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent,
+            CompletableFuture<CommitLakeTableSnapshotResponse> callback) {
+        CommitLakeTableSnapshotData commitLakeTableSnapshotData =
+                commitLakeTableSnapshotEvent.getCommitLakeTableSnapshotData();
+        Map<Long, LakeTable.LakeSnapshotMetadata> lakeSnapshotMetadatas =
+                commitLakeTableSnapshotData.getLakeTableSnapshotMetadatas();
+        ioExecutor.execute(
+                () -> {
+                    try {
+                        CommitLakeTableSnapshotResponse response =
+                                new CommitLakeTableSnapshotResponse();
+                        for (Map.Entry<Long, LakeTable.LakeSnapshotMetadata>
+                                lakeSnapshotMetadataEntry : 
lakeSnapshotMetadatas.entrySet()) {
+                            PbCommitLakeTableSnapshotRespForTable tableResp =
+                                    response.addTableResp();
+                            long tableId = lakeSnapshotMetadataEntry.getKey();
+                            tableResp.setTableId(tableId);
+                            try {
+                                lakeTableHelper.registerLakeTableSnapshotV2(
+                                        tableId, 
lakeSnapshotMetadataEntry.getValue());
+                                coordinatorEventManager.put(
+                                        new NotifyLakeTableOffsetEvent(
+                                                
commitLakeTableSnapshotData.getLakeTableSnapshot(),
+                                                commitLakeTableSnapshotData
+                                                        
.getTableBucketsMaxTieredTimestamp()));

Review Comment:
   Moving the event-publishing logic into the `forEach` loop would cause the 
event to be sent multiple times, since the event is intended for all table ids, 
not per individual table.



##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java:
##########
@@ -79,7 +79,8 @@ public enum ApiKeys {
     REMOVE_SERVER_TAG(1048, 0, 0, PUBLIC),
     REBALANCE(1049, 0, 0, PUBLIC),
     LIST_REBALANCE_PROGRESS(1050, 0, 0, PUBLIC),
-    CANCEL_REBALANCE(1051, 0, 0, PUBLIC);
+    CANCEL_REBALANCE(1051, 0, 0, PUBLIC),
+    PRE_LAKE_TABLE_SNAPSHOT(1052, 0, 0, PRIVATE);

Review Comment:
   `PRE_LAKE_TABLE_SNAPSHOT` -> `PREPARE_LAKE_TABLE_SNAPSHOT`
   
   While `PRE` might imply "before," it’s ambiguous and non-idiomatic in event 
or phase naming. `PREPARE` clearly conveys the intent.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:
##########
@@ -1262,20 +1275,58 @@ private void tryProcessCommitLakeTableSnapshot(
                                 }
 
                                 // this involves IO operation (ZK), so we do 
it in ioExecutor
-                                lakeTableHelper.upsertLakeTable(
-                                        tableId, tablePath, 
lakeTableSnapshotEntry.getValue());
+                                lakeTableHelper.registerLakeTableSnapshotV1(
+                                        tableId, 
lakeTableSnapshotEntry.getValue());
+                                // send notify lakehouse data request to all 
replicas via
+                                // coordinator event
+                                coordinatorEventManager.put(
+                                        new NotifyLakeTableOffsetEvent(
+                                                lakeTableSnapshots,
+                                                commitLakeTableSnapshotData
+                                                        
.getTableBucketsMaxTieredTimestamp()));

Review Comment:
   ditto



##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -1012,4 +1014,49 @@ message PbRebalancePlanForBucket {
   optional int32 new_leader = 4;
   repeated int32 original_replicas = 5 [packed = true];
   repeated int32 new_replicas = 6 [packed = true];
+}
+
+message PbLakeTableSnapshotMetadata {
+  required int64 table_id = 1;
+  required int64 snapshot_id = 2;
+  required string tiered_bucket_offsets_file_path = 3;
+  optional string readable_bucket_offsets_file_path = 4;
+}
+
+message PbLakeTableSnapshotInfo {
+  optional int64 table_id = 1;
+  required int64 snapshot_id = 2;
+  repeated PbLakeTableOffsetForBucket buckets_req = 3;
+  // add table path to reduce get table_path by table id
+  optional PbTablePath table_path = 4;
+}
+
+message PbLakeTableOffsetForBucket {
+  optional int64 partition_id = 1;
+  required int32 bucket_id = 2;
+  // Deprecated: log_start_offset is no longer used. Field number 3 is 
reserved for protocol compatibility.
+  // optional int64 log_start_offset = 3;
+  optional int64 log_end_offset = 4;
+  // Deprecated: partition_name is no longer used. Field number 5 is reserved 
for protocol compatibility.
+  // optional string partition_name = 5;
+  optional int64 max_timestamp = 6;
+}
+
+message PbPrepareLakeTableRespForTable {
+  optional int32 error_code = 1;
+  optional string error_message = 2;
+  optional int64 table_id = 3;
+  optional string lake_table_bucket_offsets_path = 4;

Review Comment:
   `lake_table_offsets_path` to align with the `TableOffsets` concept. 



##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java:
##########
@@ -66,6 +68,30 @@ public interface CoordinatorGateway extends RpcGateway, 
AdminGateway {
     CompletableFuture<CommitRemoteLogManifestResponse> commitRemoteLogManifest(
             CommitRemoteLogManifestRequest request);
 
+    /**
+     * Prepares lake table snapshots by merging them with existing snapshots 
and storing them to the
+     * file system.
+     *
+     * <p>This method is called during the two-phase commit process for lake 
table snapshots. It
+     * performs the following operations for each table in the request:
+     *
+     * <ul>
+     *   <li>Merges the new snapshot with the previous latest snapshot (if 
exists) to ensure
+     *       completeness
+     *   <li>Stores the merged snapshot to the remote file system. The stored 
file contains the log
+     *       end offset information for each bucket in the table
+     *   <li>Returns the file path where the snapshot is stored

Review Comment:
   
   
   Add an item for the second phase. 
   
   ```java
   <li>Call {@link #commitLakeTableSnapshot(CommitLakeTableSnapshotRequest)} 
with the offset
        *       file path to finalize the snapshot commit to ZooKeeper in the 
second phase.
   ```



##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java:
##########
@@ -66,6 +68,30 @@ public interface CoordinatorGateway extends RpcGateway, 
AdminGateway {
     CompletableFuture<CommitRemoteLogManifestResponse> commitRemoteLogManifest(
             CommitRemoteLogManifestRequest request);
 
+    /**
+     * Prepares lake table snapshots by merging them with existing snapshots 
and storing them to the
+     * file system.

Review Comment:
   ```suggestion
        * Prepares lake table snapshots by merging them with existing snapshots 
and storing them to the
        * remote file system.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to