Copilot commented on code in PR #2223:
URL: https://github.com/apache/fluss/pull/2223#discussion_r2645628123


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java:
##########
@@ -59,53 +85,196 @@ public void open() {
                         metadataUpdater::getCoordinatorServer, rpcClient, 
CoordinatorGateway.class);
     }
 
-    void commit(FlussTableLakeSnapshot flussTableLakeSnapshot) throws 
IOException {
+    String prepareCommit(long tableId, TablePath tablePath, Map<TableBucket, 
Long> logEndOffsets)
+            throws IOException {
+        PbPrepareCommitLakeTableRespForTable prepareCommitResp = null;
+        Exception exception = null;
         try {
-            CommitLakeTableSnapshotRequest request =
-                    toCommitLakeTableSnapshotRequest(flussTableLakeSnapshot);
-            coordinatorGateway.commitLakeTableSnapshot(request).get();
+            PrepareCommitLakeTableSnapshotRequest 
prepareCommitLakeTableSnapshotRequest =
+                    toPrepareCommitLakeTableSnapshotRequest(tableId, 
tablePath, logEndOffsets);
+            PrepareCommitLakeTableSnapshotResponse 
prepareCommitLakeTableSnapshotResponse =
+                    coordinatorGateway
+                            
.prepareCommitLakeTableSnapshot(prepareCommitLakeTableSnapshotRequest)
+                            .get();
+            List<PbPrepareCommitLakeTableRespForTable> 
pbPrepareCommitLakeTableRespForTables =
+                    
prepareCommitLakeTableSnapshotResponse.getPrepareCommitLakeTableRespsList();
+            checkState(pbPrepareCommitLakeTableRespForTables.size() == 1);
+            prepareCommitResp = pbPrepareCommitLakeTableRespForTables.get(0);
+            if (prepareCommitResp.hasErrorCode()) {
+                exception = 
ApiError.fromErrorMessage(prepareCommitResp).exception();
+            }
         } catch (Exception e) {
+            exception = e;
+        }
+
+        if (exception != null) {
             throw new IOException(
                     String.format(
-                            "Fail to commit table lake snapshot %s to Fluss.",
-                            flussTableLakeSnapshot),
-                    ExceptionUtils.stripExecutionException(e));
+                            "Fail to prepare commit table lake snapshot for %s 
to Fluss.",
+                            tablePath),
+                    ExceptionUtils.stripExecutionException(exception));
         }
+        return checkNotNull(prepareCommitResp).getLakeTableSnapshotFilePath();

Review Comment:
   If the prepare phase succeeds but stores a response with an error, the 
method at line 117 will throw a NullPointerException when calling 
`getLakeTableSnapshotFilePath()` on a prepareCommitResp that doesn't have this 
field set (only error fields are set). The error should be thrown before 
attempting to access the file path, or the access should be guarded.
   ```suggestion
   
           if (prepareCommitResp == null) {
               throw new IOException(
                       String.format(
                               "Prepare commit response is null for table %s; 
no lake snapshot path returned.",
                               tablePath));
           }
   
           String lakeTableSnapshotFilePath = 
prepareCommitResp.getLakeTableSnapshotFilePath();
           if (lakeTableSnapshotFilePath == null) {
               throw new IOException(
                       String.format(
                               "Prepare commit response for table %s does not 
contain a lake table snapshot file path.",
                               tablePath));
           }
   
           return lakeTableSnapshotFilePath;
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java:
##########
@@ -119,24 +176,28 @@ private LakeTableSnapshot mergeLakeTable(
         return new LakeTableSnapshot(newLakeTableSnapshot.getSnapshotId(), 
bucketLogEndOffset);
     }
 
-    private FsPath storeLakeTableSnapshot(
+    public FsPath storeLakeTableSnapshot(
             long tableId, TablePath tablePath, LakeTableSnapshot 
lakeTableSnapshot)
             throws Exception {
-        // get the remote file path to store the lake table snapshot 
information
-        FsPath remoteLakeTableSnapshotManifestPath =
-                FlussPaths.remoteLakeTableSnapshotManifestPath(remoteDataDir, 
tablePath, tableId);
+        // get the remote file path to store the lake table snapshot offset 
information
+        FsPath remoteLakeTableSnapshotOffsetPath =
+                FlussPaths.remoteLakeTableSnapshotOffsetPath(remoteDataDir, 
tablePath, tableId);
         // check whether the parent directory exists, if not, create the 
directory
-        FileSystem fileSystem = 
remoteLakeTableSnapshotManifestPath.getFileSystem();
-        if 
(!fileSystem.exists(remoteLakeTableSnapshotManifestPath.getParent())) {
-            fileSystem.mkdirs(remoteLakeTableSnapshotManifestPath.getParent());
+        FileSystem fileSystem = 
remoteLakeTableSnapshotOffsetPath.getFileSystem();
+        if (!fileSystem.exists(remoteLakeTableSnapshotOffsetPath.getParent())) 
{
+            fileSystem.mkdirs(remoteLakeTableSnapshotOffsetPath.getParent());
         }
-        // serialize table snapshot to json bytes, and write to file
-        byte[] jsonBytes = 
LakeTableSnapshotJsonSerde.toJson(lakeTableSnapshot);
+        // serialize table offsets to json bytes, and write to file
+        byte[] jsonBytes =
+                JsonSerdeUtils.writeValueAsBytes(
+                        new TableBucketOffsets(tableId, 
lakeTableSnapshot.getBucketLogEndOffset()),
+                        TableBucketOffsetsJsonSerde.INSTANCE);
+
         try (FSDataOutputStream outputStream =
                 fileSystem.create(
-                        remoteLakeTableSnapshotManifestPath, 
FileSystem.WriteMode.OVERWRITE)) {
+                        remoteLakeTableSnapshotOffsetPath, 
FileSystem.WriteMode.OVERWRITE)) {
             outputStream.write(jsonBytes);
         }
-        return remoteLakeTableSnapshotManifestPath;
+        return remoteLakeTableSnapshotOffsetPath;
     }

Review Comment:
   The method generates a new random UUID for each snapshot file path, but 
there's no cleanup mechanism mentioned for old snapshot files. Over time, these 
files will accumulate in the metadata directory, potentially causing storage 
issues. Consider implementing a cleanup strategy for old snapshot offset files.



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableJsonSerde.java:
##########
@@ -61,28 +61,35 @@ public class LakeTableJsonSerde implements 
JsonSerializer<LakeTable>, JsonDeseri
 
     @Override
     public void serialize(LakeTable lakeTable, JsonGenerator generator) throws 
IOException {
-        generator.writeStartObject();
-        generator.writeNumberField(VERSION_KEY, CURRENT_VERSION);
-
-        generator.writeArrayFieldStart(LAKE_SNAPSHOTS);
-        for (LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata :
-                checkNotNull(lakeTable.getLakeSnapshotMetadatas())) {
+        // if lake table snapshot is null, it must be version 1
+        if (lakeTable.getLakeTableSnapshot() != null) {

Review Comment:
   The condition check logic is inverted. When 
`lakeTable.getLakeTableSnapshot() != null`, it's V1 format (full snapshot data 
in ZK), but the comment says "if lake table snapshot is null, it must be 
version 1". The comment should be corrected to say "if lake table snapshot is 
not null, it must be version 1".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to