Copilot commented on code in PR #2223:
URL: https://github.com/apache/fluss/pull/2223#discussion_r2646367008


##########
fluss-common/src/test/java/org/apache/fluss/utils/json/TableBucketOffsetsJsonSerdeTest.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.utils.json;
+
+import org.apache.fluss.metadata.TableBucket;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Test for {@link TableBucketOffsetsJsonSerde}. */
+class TableBucketOffsetsJsonSerdeTest extends 
JsonSerdeTestBase<TableBucketOffsets> {

Review Comment:
   Unused class: TableBucketOffsetsJsonSerdeTest is not referenced within this 
codebase. If not used as an external API it should be removed.



##########
fluss-server/src/main/java/org/apache/fluss/server/entity/CommitLakeTableSnapshotData.java:
##########
@@ -27,14 +28,32 @@
 /** The data for request {@link CommitLakeTableSnapshotRequest}. */
 public class CommitLakeTableSnapshotData {
 
+    /**
+     * Since 0.9, this field is only used to allow the coordinator to send 
requests to tablet
+     * servers, enabling tablet servers to report metrics about synchronized 
log end offsets. In the
+     * future, we plan to have the tiering service directly report metrics, 
and this field will be
+     * removed.
+     */

Review Comment:
   The comment on line 34 says "this field will be removed" which contradicts 
the JavaDoc above that says the field "will be fully removed". Additionally, 
the comment mentions this is "only used" for notification, but the field is 
still documented in the constructor parameter comments on lines 31-36. Consider 
making these comments consistent about when/how this will be deprecated.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:
##########
@@ -1224,6 +1225,48 @@ private <T> void 
processAccessContext(AccessContextEvent<T> event) {
     private void tryProcessCommitLakeTableSnapshot(
             CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent,
             CompletableFuture<CommitLakeTableSnapshotResponse> callback) {
+        CommitLakeTableSnapshotData commitLakeTableSnapshotData =
+                commitLakeTableSnapshotEvent.getCommitLakeTableSnapshotData();
+        if 
(commitLakeTableSnapshotData.getLakeTableSnapshotMetadatas().isEmpty()) {
+            handleCommitLakeTableSnapshotV1(commitLakeTableSnapshotEvent, 
callback);
+        } else {
+            Map<Long, LakeTable.LakeSnapshotMetadata> lakeSnapshotMetadatas =
+                    
commitLakeTableSnapshotData.getLakeTableSnapshotMetadatas();
+            ioExecutor.execute(
+                    () -> {
+                        try {
+                            CommitLakeTableSnapshotResponse response =
+                                    new CommitLakeTableSnapshotResponse();
+                            for (Map.Entry<Long, 
LakeTable.LakeSnapshotMetadata>
+                                    lakeSnapshotMetadataEntry : 
lakeSnapshotMetadatas.entrySet()) {
+                                PbCommitLakeTableSnapshotRespForTable 
tableResp =
+                                        response.addTableResp();
+                                long tableId = 
lakeSnapshotMetadataEntry.getKey();
+                                tableResp.setTableId(tableId);
+                                try {
+                                    
lakeTableHelper.addLakeTableSnapshotMetadata(
+                                            tableId, 
lakeSnapshotMetadataEntry.getValue());
+                                } catch (Exception e) {
+                                    ApiError error = ApiError.fromThrowable(e);
+                                    tableResp.setError(error.error().code(), 
error.message());

Review Comment:
   If the coordinator fails to add the lake table snapshot metadata, the file 
written in the prepare phase is never cleaned up. Consider tracking prepared 
files and implementing cleanup on failure, or document that cleanup will happen 
through other mechanisms (e.g., garbage collection).
   ```suggestion
                                       tableResp.setError(error.error().code(), 
error.message());
                                       // Note: Any cleanup of prepared lake 
table snapshot files on failure
                                       // is expected to be handled by 
LakeTableHelper or other background
                                       // mechanisms (for example, garbage 
collection of unreferenced files).
                                       LOG.warn(
                                               "Failed to add lake table 
snapshot metadata for table {}. "
                                                       + "Prepared snapshot 
files may be cleaned up by background "
                                                       + "cleanup or garbage 
collection.",
                                               tableId,
                                               e);
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java:
##########
@@ -323,33 +284,45 @@ private void checkFlussNotMissingLakeSnapshot(
         // known lake snapshot, which means the data already has been 
committed to lake,
         // not to commit to lake to avoid data duplicated
         if (missingCommittedSnapshot != null) {
-            if (missingCommittedSnapshot.getSnapshotProperties() == null
-                    || missingCommittedSnapshot
-                                    .getSnapshotProperties()
-                                    
.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY)
-                            == null) {
+            String lakeSnapshotOffsetPath =
+                    missingCommittedSnapshot
+                            .getSnapshotProperties()
+                            .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
+
+            // should only will happen in v0.7 which won't put offsets info
+            // to properties
+            if (lakeSnapshotOffsetPath == null) {
                 throw new IllegalStateException(
                         String.format(
-                                "Missing required log offsets property '%s' in 
lake snapshot %d for table: ‘tablePath=%s, tableId=%d’. "
-                                        + "This property is required to commit 
the missing snapshot to Fluss. "
-                                        + "The snapshot may have been created 
by an older version of Fluss that did not store this information, "
-                                        + "or the snapshot properties may be 
corrupted.",
+                                "Can't find %s field from snapshot property.",
+                                FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY));
+            }
+
+            // the fluss-offsets will be a json string if it's tiered by v0.8,
+            // since this code path should be rare, we do not consider 
backward compatibility
+            // and throw IllegalStateException directly
+            String trimmedPath = lakeSnapshotOffsetPath.trim();
+            if (trimmedPath.startsWith("{")) {

Review Comment:
   The trimmedPath.startsWith("{") check is fragile. A legitimate file path 
could theoretically start with '{'. Consider a more robust check, such as 
attempting to parse as JSON or checking for a scheme/protocol indicator in the 
path string.



##########
fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java:
##########
@@ -591,4 +607,45 @@ protected MetadataResponse processMetadataRequest(
         return buildMetadataResponse(
                 coordinatorServer, aliveTabletServers, tablesMetadata, 
partitionsMetadata);
     }
+
+    @Override
+    public CompletableFuture<PrepareCommitLakeTableSnapshotResponse> 
prepareCommitLakeTableSnapshot(
+            PrepareCommitLakeTableSnapshotRequest request) {
+        CompletableFuture<PrepareCommitLakeTableSnapshotResponse> future =
+                new CompletableFuture<>();
+        ioExecutor.submit(
+                () -> {
+                    PrepareCommitLakeTableSnapshotResponse response =
+                            new PrepareCommitLakeTableSnapshotResponse();
+                    try {
+                        for (PbTableBucketOffsets bucketOffsets : 
request.getBucketOffsetsList()) {
+                            PbPrepareCommitLakeTableRespForTable
+                                    pbPrepareCommitLakeTableRespForTable =
+                                            
response.addPrepareCommitLakeTableResp();
+                            try {
+                                // upsert lake table snapshot, need to merge 
the snapshot with
+                                // previous latest snapshot
+                                TableBucketOffsets tableBucketOffsets =
+                                        
lakeTableHelper.upsertTableBucketOffsets(
+                                                bucketOffsets.getTableId(),
+                                                
toTableBucketOffsets(bucketOffsets));
+                                TablePath tablePath = 
toTablePath(bucketOffsets.getTablePath());
+                                FsPath fsPath =
+                                        
lakeTableHelper.storeLakeTableBucketOffsets(
+                                                tablePath, tableBucketOffsets);
+                                
pbPrepareCommitLakeTableRespForTable.setLakeTableBucketOffsetsPath(
+                                        fsPath.toString());
+                            } catch (Exception e) {
+                                Errors error = 
ApiError.fromThrowable(e).error();
+                                pbPrepareCommitLakeTableRespForTable.setError(
+                                        error.code(), error.message());
+                            }

Review Comment:
   The error handling here only sets an error message for individual table 
failures, but if an exception occurs at line 645, the entire response will 
fail. This means a failure in one table could prevent other tables from being 
processed. Consider catching exceptions inside the loop to allow partial 
success.



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java:
##########
@@ -283,18 +285,18 @@ void testTableCommitWhenFlussMissingLakeSnapshot() throws 
Exception {
         verifyLakeSnapshot(
                 tablePath,
                 tableId,
-                2,
-                getExpectedLogEndOffsets(tableId, mockCommittedSnapshot),
+                0,
+                expectedLogEndOffsets,
                 String.format(
                         "The current Fluss's lake snapshot %s is less than 
lake actual snapshot %d committed by Fluss for table: {tablePath=%s, 
tableId=%d},"
                                 + " missing snapshot: %s.",
                         null,
-                        mockCommittedSnapshot.getLakeSnapshotId(),
+                        mockMissingCommittedLakeSnapshot.getLakeSnapshotId(),
                         tablePath,
                         tableId,
-                        mockCommittedSnapshot));
+                        mockMissingCommittedLakeSnapshot));

Review Comment:
   The variable is named 'mockMissingCommittedLakeSnapshot' but the comment 
says 'mockCommittedLakeSnapshot'. For clarity and consistency with the new 
name, update the comment to reference the correct variable name 
'mockMissingCommittedLakeSnapshot'.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to