Copilot commented on code in PR #2223:
URL: https://github.com/apache/fluss/pull/2223#discussion_r2639835707


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java:
##########
@@ -323,33 +285,45 @@ private void checkFlussNotMissingLakeSnapshot(
         // known lake snapshot, which means the data already has been 
committed to lake,
         // not to commit to lake to avoid data duplicated
         if (missingCommittedSnapshot != null) {
-            if (missingCommittedSnapshot.getSnapshotProperties() == null
-                    || missingCommittedSnapshot
-                                    .getSnapshotProperties()
-                                    
.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY)
-                            == null) {
+            String lakeSnapshotOffsetPath =
+                    missingCommittedSnapshot
+                            .getSnapshotProperties()
+                            .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
+
+            // should only will happen in v0.7 which won't put offsets info
+            // to properties
+            if (lakeSnapshotOffsetPath == null) {
                 throw new IllegalStateException(
                         String.format(
-                                "Missing required log offsets property '%s' in 
lake snapshot %d for table: ‘tablePath=%s, tableId=%d’. "
-                                        + "This property is required to commit 
the missing snapshot to Fluss. "
-                                        + "The snapshot may have been created 
by an older version of Fluss that did not store this information, "
-                                        + "or the snapshot properties may be 
corrupted.",
+                                "Can't find %s field from snapshot property.",
+                                FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY));
+            }
+
+            // the fluss-offsets will be a json string if it's tiered be v0.8,
+            // since the code path should happen rarely, not to consider back 
compatibility
+            // throw IllegalStateException directly
+            String trimmedPath = lakeSnapshotOffsetPath.trim();
+            if (trimmedPath.startsWith("{")) {
+                throw new IllegalStateException(
+                        String.format(
+                                "The %s field in snapshot property is a JSON 
string (tiered by v0.8), "
+                                        + "which is not supported to restore. 
Snapshot ID: %d, Table: {tablePath=%s, tableId=%d}.",
                                 FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
                                 missingCommittedSnapshot.getLakeSnapshotId(),
                                 tablePath,
                                 tableId));
             }

Review Comment:
   The check for JSON string format at line 306 is fragile. It only checks if 
the string starts with an opening brace, which could incorrectly identify 
legitimate file paths that happen to start with that character. Consider using 
a more robust detection method, such as checking for common path prefixes or 
patterns, or attempting to parse it as JSON and catching the exception.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:
##########
@@ -1224,6 +1225,48 @@ private <T> void 
processAccessContext(AccessContextEvent<T> event) {
     private void tryProcessCommitLakeTableSnapshot(
             CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent,
             CompletableFuture<CommitLakeTableSnapshotResponse> callback) {
+        CommitLakeTableSnapshotData commitLakeTableSnapshotData =
+                commitLakeTableSnapshotEvent.getCommitLakeTableSnapshotData();
+        if 
(commitLakeTableSnapshotData.getLakeTableSnapshotMetadatas().isEmpty()) {
+            handleCommitLakeTableSnapshotV1(commitLakeTableSnapshotEvent, 
callback);
+        } else {
+            Map<Long, LakeTable.LakeSnapshotMetadata> lakeSnapshotMetadatas =
+                    
commitLakeTableSnapshotData.getLakeTableSnapshotMetadatas();
+            ioExecutor.execute(
+                    () -> {
+                        try {
+                            CommitLakeTableSnapshotResponse response =
+                                    new CommitLakeTableSnapshotResponse();
+                            for (Map.Entry<Long, 
LakeTable.LakeSnapshotMetadata>
+                                    lakeSnapshotMetadataEntry : 
lakeSnapshotMetadatas.entrySet()) {
+                                PbCommitLakeTableSnapshotRespForTable 
tableResp =
+                                        response.addTableResp();
+                                long tableId = 
lakeSnapshotMetadataEntry.getKey();
+                                tableResp.setTableId(tableId);
+                                try {
+                                    
lakeTableHelper.addLakeTableSnapshotMetadata(
+                                            tableId, 
lakeSnapshotMetadataEntry.getValue());
+                                } catch (Exception e) {
+                                    ApiError error = ApiError.fromThrowable(e);
+                                    tableResp.setError(error.error().code(), 
error.message());
+                                }
+                                coordinatorEventManager.put(
+                                        new NotifyLakeTableOffsetEvent(
+                                                
commitLakeTableSnapshotData.getLakeTableSnapshot(),
+                                                commitLakeTableSnapshotData
+                                                        
.getTableBucketsMaxTieredTimestamp()));
+                            }

Review Comment:
   The NotifyLakeTableOffsetEvent should be sent once after processing all 
tables, not inside the for loop for each table. This will cause multiple 
redundant events to be sent to the coordinator event manager, each containing 
the same data for all tables. Move this call outside the for loop, after line 
1258.
   ```suggestion
                               }
                               coordinatorEventManager.put(
                                       new NotifyLakeTableOffsetEvent(
                                               
commitLakeTableSnapshotData.getLakeTableSnapshot(),
                                               commitLakeTableSnapshotData
                                                       
.getTableBucketsMaxTieredTimestamp()));
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java:
##########
@@ -59,49 +70,149 @@ public void open() {
                         metadataUpdater::getCoordinatorServer, rpcClient, 
CoordinatorGateway.class);
     }
 
-    void commit(FlussTableLakeSnapshot flussTableLakeSnapshot) throws 
IOException {
+    String prepareCommit(long tableId, TablePath tablePath, Map<TableBucket, 
Long> logEndOffsets)
+            throws IOException {
+        PbPrepareCommitLakeTableRespForTable prepareCommitResp = null;
+        Exception exception = null;
         try {
-            CommitLakeTableSnapshotRequest request =
-                    toCommitLakeTableSnapshotRequest(flussTableLakeSnapshot);
-            coordinatorGateway.commitLakeTableSnapshot(request).get();
+            PrepareCommitLakeTableSnapshotRequest 
prepareCommitLakeTableSnapshotRequest =
+                    toPrepareCommitLakeTableSnapshotRequest(tableId, 
tablePath, logEndOffsets);
+            PrepareCommitLakeTableSnapshotResponse 
prepareCommitLakeTableSnapshotResponse =
+                    coordinatorGateway
+                            
.prepareCommitLakeTableSnapshot(prepareCommitLakeTableSnapshotRequest)
+                            .get();
+            List<PbPrepareCommitLakeTableRespForTable> 
pbPrepareCommitLakeTableRespForTables =
+                    
prepareCommitLakeTableSnapshotResponse.getPrepareCommitLakeTableRespsList();
+            checkState(pbPrepareCommitLakeTableRespForTables.size() == 1);
+            prepareCommitResp = pbPrepareCommitLakeTableRespForTables.get(0);
+            if (prepareCommitResp.hasErrorCode()) {
+                exception = 
ApiError.fromErrorMessage(prepareCommitResp).exception();
+            }
         } catch (Exception e) {
+            exception = e;
+        }
+
+        if (exception != null) {
             throw new IOException(
                     String.format(
-                            "Fail to commit table lake snapshot %s to Fluss.",
-                            flussTableLakeSnapshot),
-                    ExceptionUtils.stripExecutionException(e));
+                            "Fail to prepare commit table lake snapshot for %s 
to Fluss.",
+                            tablePath),
+                    ExceptionUtils.stripExecutionException(exception));
         }
+        return checkNotNull(prepareCommitResp).getLakeTableSnapshotFilePath();
     }
 
-    public void commit(long tableId, long snapshotId, Map<TableBucket, Long> 
logEndOffsets)
+    void commit(
+            long tableId,
+            long lakeSnapshotId,
+            String lakeSnapshotPath,
+            Map<TableBucket, Long> logEndOffsets,
+            Map<TableBucket, Long> logMaxTieredTimestamps)
             throws IOException {
-        // construct lake snapshot to commit to Fluss
-        FlussTableLakeSnapshot flussTableLakeSnapshot =
-                new FlussTableLakeSnapshot(tableId, snapshotId);
-        for (Map.Entry<TableBucket, Long> entry : logEndOffsets.entrySet()) {
-            flussTableLakeSnapshot.addBucketOffset(entry.getKey(), 
entry.getValue());
+        Exception exception = null;
+        try {
+            CommitLakeTableSnapshotRequest request =
+                    toCommitLakeTableSnapshotRequest(
+                            tableId,
+                            lakeSnapshotId,
+                            lakeSnapshotPath,
+                            logEndOffsets,
+                            logMaxTieredTimestamps);
+            List<PbCommitLakeTableSnapshotRespForTable> 
commitLakeTableSnapshotRespForTables =
+                    
coordinatorGateway.commitLakeTableSnapshot(request).get().getTableRespsList();
+            checkState(commitLakeTableSnapshotRespForTables.size() == 1);
+            PbCommitLakeTableSnapshotRespForTable commitLakeTableSnapshotRes =
+                    commitLakeTableSnapshotRespForTables.get(0);
+            if (commitLakeTableSnapshotRes.hasErrorCode()) {
+                exception = 
ApiError.fromErrorMessage(commitLakeTableSnapshotRes).exception();
+            }
+        } catch (Exception e) {
+            exception = e;
+        }
+
+        if (exception != null) {
+            throw new IOException(
+                    String.format(
+                            "Fail to commit table lake snapshot id %d of table 
%d to Fluss.",
+                            lakeSnapshotId, tableId),
+                    ExceptionUtils.stripExecutionException(exception));
         }
-        commit(flussTableLakeSnapshot);
     }
 
-    private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
-            FlussTableLakeSnapshot flussTableLakeSnapshot) {
-        CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
-                new CommitLakeTableSnapshotRequest();
+    private PrepareCommitLakeTableSnapshotRequest 
toPrepareCommitLakeTableSnapshotRequest(
+            long tableId, TablePath tablePath, Map<TableBucket, Long> 
logEndOffsets) {
+        PrepareCommitLakeTableSnapshotRequest 
prepareCommitLakeTableSnapshotRequest =
+                new PrepareCommitLakeTableSnapshotRequest();
         PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo =
-                commitLakeTableSnapshotRequest.addTablesReq();
+                prepareCommitLakeTableSnapshotRequest.addTablesReq();
+        pbLakeTableSnapshotInfo.setTableId(tableId);
 
-        pbLakeTableSnapshotInfo.setTableId(flussTableLakeSnapshot.tableId());
-        
pbLakeTableSnapshotInfo.setSnapshotId(flussTableLakeSnapshot.lakeSnapshotId());
-        for (TableBucket tableBucket : flussTableLakeSnapshot.tableBuckets()) {
+        // in prepare phase, we don't know the snapshot id,
+        // set -1 since the field is required
+        pbLakeTableSnapshotInfo.setSnapshotId(-1L);
+        for (Map.Entry<TableBucket, Long> logEndOffsetEntry : 
logEndOffsets.entrySet()) {
             PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
                     pbLakeTableSnapshotInfo.addBucketsReq();
-            long endOffset = 
flussTableLakeSnapshot.getLogEndOffset(tableBucket);
+            TableBucket tableBucket = logEndOffsetEntry.getKey();
+            pbLakeTableSnapshotInfo
+                    .setTablePath()
+                    .setDatabaseName(tablePath.getDatabaseName())
+                    .setTableName(tablePath.getTableName());

Review Comment:
   The table path is set repeatedly inside the loop for every bucket, but it 
should only be set once per table since all buckets belong to the same table. 
Move lines 157-160 outside the for loop to avoid redundant operations and 
improve performance.



##########
fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java:
##########
@@ -27,7 +27,6 @@
 public class CommittedLakeSnapshot {
 
     private final long lakeSnapshotId;
-
     private final Map<String, String> snapshotProperties;

Review Comment:
   Removed blank line between fields. While this is minor, it's inconsistent 
with the style in the rest of the class where there was a blank line. Consider 
keeping the blank line for consistency.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java:
##########
@@ -323,33 +285,45 @@ private void checkFlussNotMissingLakeSnapshot(
         // known lake snapshot, which means the data already has been 
committed to lake,
         // not to commit to lake to avoid data duplicated
         if (missingCommittedSnapshot != null) {
-            if (missingCommittedSnapshot.getSnapshotProperties() == null
-                    || missingCommittedSnapshot
-                                    .getSnapshotProperties()
-                                    
.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY)
-                            == null) {
+            String lakeSnapshotOffsetPath =
+                    missingCommittedSnapshot
+                            .getSnapshotProperties()
+                            .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
+
+            // should only will happen in v0.7 which won't put offsets info
+            // to properties
+            if (lakeSnapshotOffsetPath == null) {
                 throw new IllegalStateException(
                         String.format(
-                                "Missing required log offsets property '%s' in 
lake snapshot %d for table: ‘tablePath=%s, tableId=%d’. "
-                                        + "This property is required to commit 
the missing snapshot to Fluss. "
-                                        + "The snapshot may have been created 
by an older version of Fluss that did not store this information, "
-                                        + "or the snapshot properties may be 
corrupted.",
+                                "Can't find %s field from snapshot property.",
+                                FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY));
+            }
+
+            // the fluss-offsets will be a json string if it's tiered be v0.8,
+            // since the code path should happen rarely, not to consider back 
compatibility
+            // throw IllegalStateException directly

Review Comment:
   Comment contains a typo: 'will be a json string if it's tiered be v0.8' 
should be 'will be a json string if it's tiered by v0.8'.
   ```suggestion
               // the fluss-offsets will be a json string if it's tiered by 
v0.8,
               // since this code path should be rare, we do not consider 
backward compatibility
               // and throw IllegalStateException directly
   ```



##########
fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java:
##########
@@ -372,6 +374,12 @@ public CompletableFuture<DescribeClusterConfigsResponse> 
describeClusterConfigs(
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public CompletableFuture<PrepareCommitLakeTableSnapshotResponse> 
prepareCommitLakeTableSnapshot(
+            PrepareCommitLakeTableSnapshotRequest request) {
+        return null;

Review Comment:
   The method returns null instead of throwing UnsupportedOperationException 
like other unimplemented methods in this test class (see lines 374). For 
consistency and to avoid potential NullPointerException in tests, consider 
throwing UnsupportedOperationException instead.
   ```suggestion
           throw new UnsupportedOperationException();
   ```



##########
fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java:
##########
@@ -333,6 +335,12 @@ public CompletableFuture<DescribeClusterConfigsResponse> 
describeClusterConfigs(
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public CompletableFuture<PrepareCommitLakeTableSnapshotResponse> 
prepareCommitLakeTableSnapshot(
+            PrepareCommitLakeTableSnapshotRequest request) {
+        return null;

Review Comment:
   The method returns null instead of throwing UnsupportedOperationException 
like other unimplemented methods in this test class (see lines 324, 335). For 
consistency and to avoid potential NullPointerException in tests, consider 
throwing UnsupportedOperationException instead.
   ```suggestion
           throw new UnsupportedOperationException();
   ```



##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -449,14 +449,48 @@ message NotifyRemoteLogOffsetsRequest {
 message NotifyRemoteLogOffsetsResponse {
 }
 
+message PrepareCommitLakeTableSnapshotRequest {
+  repeated PbLakeTableSnapshotInfo tables_req = 1;
+}
+
+message PrepareCommitLakeTableSnapshotResponse {
+  repeated PbPrepareCommitLakeTableRespForTable prepare_commit_lake_table_resp 
= 1;
+}
+
+message PbPrepareCommitLakeTableRespForTable {
+  optional string lake_table_snapshot_file_path = 1;
+  optional int32 error_code = 2;
+  optional string error_message = 3;
+}
+
 message CommitLakeTableSnapshotRequest {
+  // Deprecated: PbLakeTableSnapshotInfo is no longer used for committing lake 
table snapshots.
+  // Currently, it is only used to allow the coordinator to notify tablet 
servers about the current
+  // synchronized log end offsets, which are then reported to metrics. In the 
future, we plan to
+  // have tiering directly report to metrics, and this field will be fully 
removed.
+  // Still reserve it for protocol compatibility.
   repeated PbLakeTableSnapshotInfo tables_req = 1;
+  // The metadata for lake table snapshots to be committed. Each entry 
contains the table ID,
+  // snapshot ID, and the file paths where the snapshot data (containing 
bucket log end offset
+  // information) is stored. The tiered_snapshot_file_path points to the file 
storing tiered log
+  // end offsets, while readable_snapshot_file_path (if present) points to the 
file storing readable
+  // log end offsets.
+  repeated PbLakeTableSnapshotMetadata lake_table_snapshot_metadata = 2;
+}
+
+message PbLakeTableSnapshotMetadata {
+  required int64 table_id = 1;
+  required int64 snapshot_id = 2;
+  required string tiered_snapshot_file_path = 3;
+  optional string readable_snapshot_file_path = 4;

Review Comment:
   The new `tiered_snapshot_file_path` and `readable_snapshot_file_path` fields 
are taken directly from client RPC requests and later used to build `FsPath` 
instances that are opened and deleted server-side without any validation (see 
`ServerRpcMessageUtils.getCommitLakeTableSnapshotData` → 
`LakeTable.LakeSnapshotMetadata` → 
`LakeTable.getLatestTableSnapshot`/`discard`). This lets a malicious or 
compromised client call `commitLakeTableSnapshot` with arbitrary filesystem 
paths (e.g., outside the configured remote data directory) and cause the 
coordinator to read and delete any file accessible under its filesystem 
credentials. To avoid arbitrary file read/delete, derive these paths on the 
server from trusted table metadata or, at minimum, enforce strict validation 
that confines them to expected directories and schemes before using them.
   ```suggestion
     // The metadata for lake table snapshots to be committed. Each entry 
contains the table ID
     // and snapshot ID for which snapshot data (containing bucket log end 
offset information) is
     // stored. Implementations should derive the actual filesystem paths for 
snapshot data on the
     // server side from trusted table metadata instead of relying on 
client-provided paths.
     // 
     // Deprecated: PbLakeTableSnapshotMetadata.tiered_snapshot_file_path and
     // PbLakeTableSnapshotMetadata.readable_snapshot_file_path are deprecated 
and must not be used
     // by new clients; they will be ignored by secure server implementations 
and are kept only for
     // protocol compatibility.
     repeated PbLakeTableSnapshotMetadata lake_table_snapshot_metadata = 2;
   }
   
   message PbLakeTableSnapshotMetadata {
     required int64 table_id = 1;
     required int64 snapshot_id = 2;
     // Deprecated: this field previously carried a client-specified filesystem 
path. Servers should
     // instead derive the tiered snapshot file path from trusted metadata and 
ignore this value.
     required string tiered_snapshot_file_path = 3 [deprecated = true];
     // Deprecated: this field previously carried a client-specified readable 
snapshot file path.
     // Servers should derive any readable snapshot file path from trusted 
metadata and ignore this
     // value.
     optional string readable_snapshot_file_path = 4 [deprecated = true];
   ```



##########
fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java:
##########
@@ -249,4 +251,10 @@ public CompletableFuture<DescribeClusterConfigsResponse> 
describeClusterConfigs(
             DescribeClusterConfigsRequest request) {
         return null;
     }
+
+    @Override
+    public CompletableFuture<PrepareCommitLakeTableSnapshotResponse> 
prepareCommitLakeTableSnapshot(
+            PrepareCommitLakeTableSnapshotRequest request) {
+        return null;
+    }

Review Comment:
   The method returns null instead of throwing UnsupportedOperationException 
like other unimplemented methods in this test class (see lines 251). For 
consistency and to avoid potential NullPointerException in tests, consider 
throwing UnsupportedOperationException instead.



##########
fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java:
##########
@@ -591,4 +606,44 @@ protected MetadataResponse processMetadataRequest(
         return buildMetadataResponse(
                 coordinatorServer, aliveTabletServers, tablesMetadata, 
partitionsMetadata);
     }
+
+    @Override
+    public CompletableFuture<PrepareCommitLakeTableSnapshotResponse> 
prepareCommitLakeTableSnapshot(
+            PrepareCommitLakeTableSnapshotRequest request) {
+        CompletableFuture<PrepareCommitLakeTableSnapshotResponse> future =
+                new CompletableFuture<>();
+        ioExecutor.submit(
+                () -> {
+                    PrepareCommitLakeTableSnapshotResponse response =
+                            new PrepareCommitLakeTableSnapshotResponse();
+                    for (PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo :
+                            request.getTablesReqsList()) {
+                        PbPrepareCommitLakeTableRespForTable 
pbPrepareCommitLakeTableRespForTable =
+                                response.addPrepareCommitLakeTableResp();
+                        try {
+                            // upsert lake table snapshot, need to merge the 
snapshot with previous
+                            // latest snapshot
+                            LakeTableSnapshot lakeTableSnapshot =
+                                    lakeTableHelper.upsertLakeTableSnapshot(
+                                            
pbLakeTableSnapshotInfo.getTableId(),
+                                            
toLakeSnapshot(pbLakeTableSnapshotInfo));
+                            TablePath tablePath =
+                                    
toTablePath(pbLakeTableSnapshotInfo.getTablePath());
+                            FsPath fsPath =
+                                    lakeTableHelper.storeLakeTableSnapshot(
+                                            
pbLakeTableSnapshotInfo.getTableId(),
+                                            tablePath,
+                                            lakeTableSnapshot);
+                            
pbPrepareCommitLakeTableRespForTable.setLakeTableSnapshotFilePath(
+                                    fsPath.toString());
+                        } catch (Exception e) {
+                            Errors error = ApiError.fromThrowable(e).error();
+                            pbPrepareCommitLakeTableRespForTable.setError(
+                                    error.code(), error.message());
+                        }
+                    }
+                    future.complete(response);

Review Comment:
   The submit operation doesn't handle exceptions that may occur during the 
task execution. If the lambda throws an exception before completing the future, 
the exception will be swallowed. Consider wrapping the entire task in a 
try-catch block that calls future.completeExceptionally(e) on error to ensure 
all exceptions are properly propagated.
   ```suggestion
                       try {
                           PrepareCommitLakeTableSnapshotResponse response =
                                   new PrepareCommitLakeTableSnapshotResponse();
                           for (PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo 
:
                                   request.getTablesReqsList()) {
                               PbPrepareCommitLakeTableRespForTable
                                       pbPrepareCommitLakeTableRespForTable =
                                               
response.addPrepareCommitLakeTableResp();
                               try {
                                   // upsert lake table snapshot, need to merge 
the snapshot with previous
                                   // latest snapshot
                                   LakeTableSnapshot lakeTableSnapshot =
                                           
lakeTableHelper.upsertLakeTableSnapshot(
                                                   
pbLakeTableSnapshotInfo.getTableId(),
                                                   
toLakeSnapshot(pbLakeTableSnapshotInfo));
                                   TablePath tablePath =
                                           
toTablePath(pbLakeTableSnapshotInfo.getTablePath());
                                   FsPath fsPath =
                                           
lakeTableHelper.storeLakeTableSnapshot(
                                                   
pbLakeTableSnapshotInfo.getTableId(),
                                                   tablePath,
                                                   lakeTableSnapshot);
                                   
pbPrepareCommitLakeTableRespForTable.setLakeTableSnapshotFilePath(
                                           fsPath.toString());
                               } catch (Exception e) {
                                   Errors error = 
ApiError.fromThrowable(e).error();
                                   
pbPrepareCommitLakeTableRespForTable.setError(
                                           error.code(), error.message());
                               }
                           }
                           future.complete(response);
                       } catch (Throwable t) {
                           future.completeExceptionally(t);
                       }
   ```



##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -449,14 +449,48 @@ message NotifyRemoteLogOffsetsRequest {
 message NotifyRemoteLogOffsetsResponse {
 }
 
+message PrepareCommitLakeTableSnapshotRequest {
+  repeated PbLakeTableSnapshotInfo tables_req = 1;
+}
+
+message PrepareCommitLakeTableSnapshotResponse {
+  repeated PbPrepareCommitLakeTableRespForTable prepare_commit_lake_table_resp 
= 1;
+}
+
+message PbPrepareCommitLakeTableRespForTable {
+  optional string lake_table_snapshot_file_path = 1;
+  optional int32 error_code = 2;
+  optional string error_message = 3;
+}

Review Comment:
   The PbPrepareCommitLakeTableRespForTable message is missing a table_id 
field. Without this field, clients cannot determine which table each response 
corresponds to when processing a batch request with multiple tables. Add a 
required int64 table_id field to properly identify the table for each response.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java:
##########
@@ -59,49 +70,149 @@ public void open() {
                         metadataUpdater::getCoordinatorServer, rpcClient, 
CoordinatorGateway.class);
     }
 
-    void commit(FlussTableLakeSnapshot flussTableLakeSnapshot) throws 
IOException {
+    String prepareCommit(long tableId, TablePath tablePath, Map<TableBucket, 
Long> logEndOffsets)
+            throws IOException {
+        PbPrepareCommitLakeTableRespForTable prepareCommitResp = null;
+        Exception exception = null;
         try {
-            CommitLakeTableSnapshotRequest request =
-                    toCommitLakeTableSnapshotRequest(flussTableLakeSnapshot);
-            coordinatorGateway.commitLakeTableSnapshot(request).get();
+            PrepareCommitLakeTableSnapshotRequest 
prepareCommitLakeTableSnapshotRequest =
+                    toPrepareCommitLakeTableSnapshotRequest(tableId, 
tablePath, logEndOffsets);
+            PrepareCommitLakeTableSnapshotResponse 
prepareCommitLakeTableSnapshotResponse =
+                    coordinatorGateway
+                            
.prepareCommitLakeTableSnapshot(prepareCommitLakeTableSnapshotRequest)
+                            .get();
+            List<PbPrepareCommitLakeTableRespForTable> 
pbPrepareCommitLakeTableRespForTables =
+                    
prepareCommitLakeTableSnapshotResponse.getPrepareCommitLakeTableRespsList();
+            checkState(pbPrepareCommitLakeTableRespForTables.size() == 1);
+            prepareCommitResp = pbPrepareCommitLakeTableRespForTables.get(0);
+            if (prepareCommitResp.hasErrorCode()) {
+                exception = 
ApiError.fromErrorMessage(prepareCommitResp).exception();
+            }
         } catch (Exception e) {
+            exception = e;
+        }
+
+        if (exception != null) {
             throw new IOException(
                     String.format(
-                            "Fail to commit table lake snapshot %s to Fluss.",
-                            flussTableLakeSnapshot),
-                    ExceptionUtils.stripExecutionException(e));
+                            "Fail to prepare commit table lake snapshot for %s 
to Fluss.",
+                            tablePath),
+                    ExceptionUtils.stripExecutionException(exception));
         }
+        return checkNotNull(prepareCommitResp).getLakeTableSnapshotFilePath();
     }
 
-    public void commit(long tableId, long snapshotId, Map<TableBucket, Long> 
logEndOffsets)
+    void commit(
+            long tableId,
+            long lakeSnapshotId,
+            String lakeSnapshotPath,
+            Map<TableBucket, Long> logEndOffsets,
+            Map<TableBucket, Long> logMaxTieredTimestamps)
             throws IOException {
-        // construct lake snapshot to commit to Fluss
-        FlussTableLakeSnapshot flussTableLakeSnapshot =
-                new FlussTableLakeSnapshot(tableId, snapshotId);
-        for (Map.Entry<TableBucket, Long> entry : logEndOffsets.entrySet()) {
-            flussTableLakeSnapshot.addBucketOffset(entry.getKey(), 
entry.getValue());
+        Exception exception = null;
+        try {
+            CommitLakeTableSnapshotRequest request =
+                    toCommitLakeTableSnapshotRequest(
+                            tableId,
+                            lakeSnapshotId,
+                            lakeSnapshotPath,
+                            logEndOffsets,
+                            logMaxTieredTimestamps);
+            List<PbCommitLakeTableSnapshotRespForTable> 
commitLakeTableSnapshotRespForTables =
+                    
coordinatorGateway.commitLakeTableSnapshot(request).get().getTableRespsList();
+            checkState(commitLakeTableSnapshotRespForTables.size() == 1);
+            PbCommitLakeTableSnapshotRespForTable commitLakeTableSnapshotRes =
+                    commitLakeTableSnapshotRespForTables.get(0);
+            if (commitLakeTableSnapshotRes.hasErrorCode()) {
+                exception = 
ApiError.fromErrorMessage(commitLakeTableSnapshotRes).exception();
+            }
+        } catch (Exception e) {
+            exception = e;
+        }
+
+        if (exception != null) {
+            throw new IOException(
+                    String.format(
+                            "Fail to commit table lake snapshot id %d of table 
%d to Fluss.",
+                            lakeSnapshotId, tableId),
+                    ExceptionUtils.stripExecutionException(exception));
         }
-        commit(flussTableLakeSnapshot);
     }
 
-    private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
-            FlussTableLakeSnapshot flussTableLakeSnapshot) {
-        CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
-                new CommitLakeTableSnapshotRequest();
+    private PrepareCommitLakeTableSnapshotRequest 
toPrepareCommitLakeTableSnapshotRequest(
+            long tableId, TablePath tablePath, Map<TableBucket, Long> 
logEndOffsets) {
+        PrepareCommitLakeTableSnapshotRequest 
prepareCommitLakeTableSnapshotRequest =
+                new PrepareCommitLakeTableSnapshotRequest();
         PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo =
-                commitLakeTableSnapshotRequest.addTablesReq();
+                prepareCommitLakeTableSnapshotRequest.addTablesReq();
+        pbLakeTableSnapshotInfo.setTableId(tableId);
 
-        pbLakeTableSnapshotInfo.setTableId(flussTableLakeSnapshot.tableId());
-        
pbLakeTableSnapshotInfo.setSnapshotId(flussTableLakeSnapshot.lakeSnapshotId());
-        for (TableBucket tableBucket : flussTableLakeSnapshot.tableBuckets()) {
+        // in prepare phase, we don't know the snapshot id,
+        // set -1 since the field is required
+        pbLakeTableSnapshotInfo.setSnapshotId(-1L);
+        for (Map.Entry<TableBucket, Long> logEndOffsetEntry : 
logEndOffsets.entrySet()) {
             PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
                     pbLakeTableSnapshotInfo.addBucketsReq();
-            long endOffset = 
flussTableLakeSnapshot.getLogEndOffset(tableBucket);
+            TableBucket tableBucket = logEndOffsetEntry.getKey();
+            pbLakeTableSnapshotInfo
+                    .setTablePath()
+                    .setDatabaseName(tablePath.getDatabaseName())
+                    .setTableName(tablePath.getTableName());
             if (tableBucket.getPartitionId() != null) {
                 
pbLakeTableOffsetForBucket.setPartitionId(tableBucket.getPartitionId());
             }
             pbLakeTableOffsetForBucket.setBucketId(tableBucket.getBucket());
-            pbLakeTableOffsetForBucket.setLogEndOffset(endOffset);
+            
pbLakeTableOffsetForBucket.setLogEndOffset(logEndOffsetEntry.getValue());
+        }
+        return prepareCommitLakeTableSnapshotRequest;
+    }
+
+    private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
+            long tableId,
+            long snapshotId,
+            String lakeSnapshotPath,
+            Map<TableBucket, Long> logEndOffsets,
+            Map<TableBucket, Long> logMaxTieredTimestamps) {
+        CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
+                new CommitLakeTableSnapshotRequest();
+
+        // Add lake table snapshot metadata
+        PbLakeTableSnapshotMetadata pbLakeTableSnapshotMetadata =
+                commitLakeTableSnapshotRequest.addLakeTableSnapshotMetadata();
+        pbLakeTableSnapshotMetadata.setSnapshotId(snapshotId);
+        pbLakeTableSnapshotMetadata.setTableId(tableId);
+        // tiered snapshot file path is equal to readable snapshot currently
+        
pbLakeTableSnapshotMetadata.setTieredSnapshotFilePath(lakeSnapshotPath);
+        
pbLakeTableSnapshotMetadata.setReadableSnapshotFilePath(lakeSnapshotPath);
+
+        // Add PbLakeTableSnapshotInfo for metrics reporting (to notify tablet 
servers about
+        // synchronized log end offsets and max timestamps)
+        if (!logEndOffsets.isEmpty()) {
+            PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo =
+                    commitLakeTableSnapshotRequest.addTablesReq();
+            for (Map.Entry<TableBucket, Long> logEndOffsetEntry : 
logEndOffsets.entrySet()) {
+
+                pbLakeTableSnapshotInfo.setTableId(tableId);
+                pbLakeTableSnapshotInfo.setSnapshotId(snapshotId);

Review Comment:
   The tableId and snapshotId are set repeatedly inside the loop for every 
bucket. Since all buckets in the loop belong to the same table and snapshot, 
these fields should be set once before the loop begins, not for each iteration. 
Move lines 195-196 outside the for loop.
   ```suggestion
               pbLakeTableSnapshotInfo.setTableId(tableId);
               pbLakeTableSnapshotInfo.setSnapshotId(snapshotId);
               for (Map.Entry<TableBucket, Long> logEndOffsetEntry : 
logEndOffsets.entrySet()) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to