Copilot commented on code in PR #2037:
URL: https://github.com/apache/fluss/pull/2037#discussion_r2570280285


##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:
##########
@@ -1161,7 +1161,13 @@ private CommitLakeTableSnapshotResponse 
tryProcessCommitLakeTableSnapshot(
             tableResp.setTableId(tableId);
 
             try {
-                zooKeeperClient.upsertLakeTableSnapshot(tableId, 
lakeTableSnapshotEntry.getValue());
+                TablePath tablePath = 
coordinatorContext.getTablePathById(tableId);
+                if (tablePath == null) {
+                    throw new RuntimeException(
+                            String.format("Failed to find table path for table 
id: %d", tableId));
+                }

Review Comment:
   When `tablePath` is null, a RuntimeException is thrown, but the newly 
created snapshot file is not cleaned up. This could lead to orphaned snapshot 
files in the remote storage. Consider cleaning up the created snapshot file in 
the catch block or before throwing the exception.



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -1031,17 +1048,36 @@ public void upsertLakeTableSnapshot(long tableId, 
LakeTableSnapshot lakeTableSna
                             bucketLogEndOffset,
                             bucketMaxTimestamp,
                             partitionNameById);
-            zkClient.setData().forPath(path, 
LakeTableZNode.encode(lakeTableSnapshot));
+        }
+
+        LakeTable lakeTable =
+                LakeTableUtils.storeLakeTableSnapshot(
+                        configuration, tablePath, tableId, lakeTableSnapshot);
+        byte[] zkData = LakeTableZNode.encode(lakeTable);
+        if (optPreviousLakeTable.isPresent()) {
+            zkClient.setData().forPath(zkPath, zkData);
+            optPreviousLakeTable.get().discard();
         } else {
-            zkClient.create()
-                    .creatingParentsIfNeeded()
-                    .forPath(path, LakeTableZNode.encode(lakeTableSnapshot));
+            zkClient.create().creatingParentsIfNeeded().forPath(zkPath, 
zkData);
         }
     }
 
+    /**
+     * Gets the {@link LakeTable} for the given table ID.
+     *
+     * @param tableId the table ID
+     * @return an Optional containing the LakeTable if it exists, empty 
otherwise
+     * @throws Exception if the operation fails
+     */
+    public Optional<LakeTable> getLakeTable(long tableId) throws Exception {
+        String zkPath = LakeTableZNode.path(tableId);
+        return getOrEmpty(zkPath).map(LakeTableZNode::decode);
+    }
+
+    /** . */

Review Comment:
   The JavaDoc comment is incomplete with only a period. This should provide a 
proper description of what the method does, similar to the `getLakeTable` 
method above it.
   ```suggestion
       /**
        * Gets the {@link LakeTableSnapshot} for the given table ID.
        *
        * @param tableId the table ID
        * @return an Optional containing the LakeTableSnapshot if the table 
exists, empty otherwise
        * @throws Exception if the operation fails
        */
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableUtils.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lake;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.fs.FSDataOutputStream;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.utils.FlussPaths;
+
+import java.io.IOException;
+
+/** todo . */

Review Comment:
   The JavaDoc comment is incomplete with only "todo .". This should describe 
the purpose of this utility class and its methods.
   ```suggestion
   /**
    * Utility class for operations related to LakeTable snapshots in the data 
lake.
    * <p>
    * Provides methods to store LakeTable snapshots to a remote file system.
    * </p>
    *
    * <p>
    * The main method {@link #storeLakeTableSnapshot(Configuration, TablePath, 
long, LakeTableSnapshot)}
    * serializes a {@link LakeTableSnapshot} to JSON and writes it to the 
remote file system,
    * returning a {@link LakeTable} referencing the stored snapshot.
    * </p>
    */
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -1031,17 +1048,36 @@ public void upsertLakeTableSnapshot(long tableId, 
LakeTableSnapshot lakeTableSna
                             bucketLogEndOffset,
                             bucketMaxTimestamp,
                             partitionNameById);
-            zkClient.setData().forPath(path, 
LakeTableZNode.encode(lakeTableSnapshot));
+        }
+
+        LakeTable lakeTable =
+                LakeTableUtils.storeLakeTableSnapshot(
+                        configuration, tablePath, tableId, lakeTableSnapshot);
+        byte[] zkData = LakeTableZNode.encode(lakeTable);
+        if (optPreviousLakeTable.isPresent()) {
+            zkClient.setData().forPath(zkPath, zkData);
+            optPreviousLakeTable.get().discard();
         } else {
-            zkClient.create()
-                    .creatingParentsIfNeeded()
-                    .forPath(path, LakeTableZNode.encode(lakeTableSnapshot));
+            zkClient.create().creatingParentsIfNeeded().forPath(zkPath, 
zkData);
         }

Review Comment:
   If the ZK operation (setData or create) fails after the snapshot file has 
been created at line 1054, the newly created file will be orphaned in remote 
storage. Consider wrapping the ZK operations in a try-catch block and cleaning 
up the new snapshot file if the operation fails.



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTable.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lake;
+
+import org.apache.fluss.fs.FSDataInputStream;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.server.zk.data.ZkData;
+import org.apache.fluss.utils.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+/**
+ * Represents lake table snapshot information stored in {@link 
ZkData.LakeTableZNode}.
+ *
+ * <p>This class supports two storage formats:
+ *
+ * <ul>
+ *   <li>Version 1 (legacy): Contains the full {@link LakeTableSnapshot} data 
directly
+ *   <li>Version 2 (current): Contains only the metadata file path, with 
actual data in remote file
+ * </ul>
+ *
+ * @see LakeTableJsonSerde for JSON serialization and deserialization
+ */
+public class LakeTable {
+
+    @Nullable private final FsPath metadataPath;
+    @Nullable private final LakeTableSnapshot lakeTableSnapshot;
+
+    /**
+     * Creates a LakeTable from a LakeTableSnapshot (version 1 format).
+     *
+     * @param lakeTableSnapshot the snapshot data
+     */
+    public LakeTable(LakeTableSnapshot lakeTableSnapshot) {
+        this.lakeTableSnapshot = lakeTableSnapshot;
+        this.metadataPath = null;
+    }
+
+    /**
+     * Creates a LakeTable with a metadata file path (version 2 format).
+     *
+     * @param metadataPath the path to the metadata file containing the 
snapshot data
+     */
+    public LakeTable(@Nullable FsPath metadataPath) {
+        this.metadataPath = metadataPath;
+        this.lakeTableSnapshot = null;
+    }
+
+    @Nullable
+    public FsPath getMetadataPath() {
+        return metadataPath;
+    }
+
+    /**
+     * Converts this LakeTable to a LakeTableSnapshot.
+     *
+     * <p>If this LakeTable was created from a LakeTableSnapshot (version 1), 
returns it directly.
+     * Otherwise, reads the snapshot data from the metadata file.
+     *
+     * @return the LakeTableSnapshot
+     * @throws IOException if the snapshot cannot be read from the file

Review Comment:
   Javadoc for toLakeTableSnapshot claims to throw IOException but this is 
impossible.
   ```suggestion
        * @throws RuntimeException if the snapshot cannot be read from the file 
due to an IOException
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTable.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lake;
+
+import org.apache.fluss.fs.FSDataInputStream;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.server.zk.data.ZkData;
+import org.apache.fluss.utils.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+/**
+ * Represents lake table snapshot information stored in {@link 
ZkData.LakeTableZNode}.
+ *
+ * <p>This class supports two storage formats:
+ *
+ * <ul>
+ *   <li>Version 1 (legacy): Contains the full {@link LakeTableSnapshot} data 
directly
+ *   <li>Version 2 (current): Contains only the metadata file path, with 
actual data in remote file
+ * </ul>
+ *
+ * @see LakeTableJsonSerde for JSON serialization and deserialization
+ */
+public class LakeTable {
+
+    @Nullable private final FsPath metadataPath;
+    @Nullable private final LakeTableSnapshot lakeTableSnapshot;
+
+    /**
+     * Creates a LakeTable from a LakeTableSnapshot (version 1 format).
+     *
+     * @param lakeTableSnapshot the snapshot data
+     */
+    public LakeTable(LakeTableSnapshot lakeTableSnapshot) {
+        this.lakeTableSnapshot = lakeTableSnapshot;
+        this.metadataPath = null;
+    }
+
+    /**
+     * Creates a LakeTable with a metadata file path (version 2 format).
+     *
+     * @param metadataPath the path to the metadata file containing the 
snapshot data
+     */
+    public LakeTable(@Nullable FsPath metadataPath) {
+        this.metadataPath = metadataPath;
+        this.lakeTableSnapshot = null;
+    }
+
+    @Nullable
+    public FsPath getMetadataPath() {
+        return metadataPath;
+    }
+
+    /**
+     * Converts this LakeTable to a LakeTableSnapshot.
+     *
+     * <p>If this LakeTable was created from a LakeTableSnapshot (version 1), 
returns it directly.
+     * Otherwise, reads the snapshot data from the metadata file.
+     *
+     * @return the LakeTableSnapshot
+     * @throws IOException if the snapshot cannot be read from the file
+     */
+    public LakeTableSnapshot toLakeTableSnapshot() {
+        if (lakeTableSnapshot != null) {
+            return lakeTableSnapshot;
+        }
+        try {
+            FSDataInputStream inputStream = openInputStream();
+            try (ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream()) {
+                IOUtils.copyBytes(inputStream, outputStream, true);
+                return 
LakeTableSnapshotJsonSerde.fromJson(outputStream.toByteArray());
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);

Review Comment:
   The IOException is wrapped in a RuntimeException, losing valuable context 
about the error. Consider using a more specific exception type (e.g., a custom 
exception like `LakeTableSnapshotReadException`) that preserves the IOException 
as the cause, or update the JavaDoc to specify that RuntimeException is thrown 
instead of IOException.
   ```suggestion
       public LakeTableSnapshot toLakeTableSnapshot() throws IOException {
           if (lakeTableSnapshot != null) {
               return lakeTableSnapshot;
           }
           FSDataInputStream inputStream = openInputStream();
           try (ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream()) {
               IOUtils.copyBytes(inputStream, outputStream, true);
               return 
LakeTableSnapshotJsonSerde.fromJson(outputStream.toByteArray());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to