Copilot commented on code in PR #2037:
URL: https://github.com/apache/fluss/pull/2037#discussion_r2587742529


##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTable.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lake;
+
+import org.apache.fluss.fs.FSDataInputStream;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.server.zk.data.ZkData;
+import org.apache.fluss.utils.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayOutputStream;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * Represents lake table snapshot information stored in {@link 
ZkData.LakeTableZNode}.
+ *
+ * <p>This class supports two storage formats:
+ *
+ * <ul>
+ *   <li>Version 1 (legacy): Contains the full {@link LakeTableSnapshot} data 
directly
+ *   <li>Version 2 (current): Contains only the file paths points to the file 
storing {@link
+ *       LakeTableSnapshot}, with actual data in remote file. It'll contain 
multiple file paths
+ *       points to track different lake table snapshot.

Review Comment:
   The comment states "It'll contain multiple file paths points to track 
different lake table snapshot" but currently the implementation only contains 
one file path (`lakeTableLatestSnapshotFileHandle`). The 
`lakeTableLatestCompactedSnapshotFileHandle` is declared but unused. The 
comment should either be updated to reflect the current single-file-path 
implementation, or if multiple paths are planned for future use, this should be 
clarified with a TODO comment.
   ```suggestion
    *       LakeTableSnapshot}, with actual data in remote file. Currently, 
only a single file path
    *       is supported to track the latest lake table snapshot.
    *       <!-- TODO: Support multiple file paths for tracking different lake 
table snapshots in future versions. -->
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTable.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lake;
+
+import org.apache.fluss.fs.FSDataInputStream;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.server.zk.data.ZkData;
+import org.apache.fluss.utils.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayOutputStream;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * Represents lake table snapshot information stored in {@link 
ZkData.LakeTableZNode}.
+ *
+ * <p>This class supports two storage formats:
+ *
+ * <ul>
+ *   <li>Version 1 (legacy): Contains the full {@link LakeTableSnapshot} data 
directly
+ *   <li>Version 2 (current): Contains only the file paths points to the file 
storing {@link
+ *       LakeTableSnapshot}, with actual data in remote file. It'll contain 
multiple file paths
+ *       points to track different lake table snapshot.
+ * </ul>
+ *
+ * @see LakeTableJsonSerde for JSON serialization and deserialization
+ */
+public class LakeTable {
+
+    // Version 2 (current):
+    // the pointer to the file stores the latest known LakeTableSnapshot, will 
be null in
+    // version1
+    @Nullable private final FsPath lakeTableLatestSnapshotFileHandle;
+
+    // the pointer to the file stores the latest known compacted 
LakeTableSnapshot, will be null in
+    // version1 or no any known compacted LakeTableSnapshot
+    @Nullable private final FsPath lakeTableLatestCompactedSnapshotFileHandle 
= null;
+

Review Comment:
   The field `lakeTableLatestCompactedSnapshotFileHandle` is declared but never 
used. It's initialized to `null` and has no getter or setter. Consider removing 
this unused field or implementing the functionality if it's planned for future 
use. If it's a placeholder for future functionality, add a TODO comment 
explaining the intention.
   ```suggestion
   
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableSnapshotJsonSerde.java:
##########
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lake;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.metadata.TableBucket;
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.fluss.utils.json.JsonDeserializer;
+import org.apache.fluss.utils.json.JsonSerdeUtils;
+import org.apache.fluss.utils.json.JsonSerializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Json serializer and deserializer for {@link LakeTableSnapshot}.
+ *
+ * <p>This serde supports two storage format versions:
+ *
+ * <ul>
+ *   <li>Version 1 (legacy): Each bucket object contains full information 
including repeated
+ *       partition names and partition_id in each bucket entry.
+ *   <li>Version 2 (current): Compact format that optimizes layout to avoid 
duplication:
+ *       <ul>
+ *         <li>Extracts partition names to a top-level "partition_names" map
+ *         <li>Groups buckets by partition_id in "buckets" to avoid repeating 
partition_id in each
+ *             bucket
+ *         <li>Non-partition table uses array format: "buckets": [...]
+ *         <li>Partition table uses object format: "buckets": {"1": [...], 
"2": [...]}, "1", "2" is
+ *             for the partition id
+ *       </ul>
+ *       Field names remain the same as Version 1, only the layout is 
optimized.
+ * </ul>
+ */
+public class LakeTableSnapshotJsonSerde
+        implements JsonSerializer<LakeTableSnapshot>, 
JsonDeserializer<LakeTableSnapshot> {
+
+    public static final LakeTableSnapshotJsonSerde INSTANCE = new 
LakeTableSnapshotJsonSerde();
+
+    private static final String VERSION_KEY = "version";
+
+    private static final String SNAPSHOT_ID = "snapshot_id";
+    private static final String TABLE_ID = "table_id";
+    private static final String PARTITION_ID = "partition_id";
+    private static final String BUCKETS = "buckets";
+    private static final String BUCKET_ID = "bucket_id";
+    private static final String LOG_START_OFFSET = "log_start_offset";
+    private static final String LOG_END_OFFSET = "log_end_offset";
+    private static final String MAX_TIMESTAMP = "max_timestamp";
+    private static final String PARTITION_NAME = "partition_name";
+
+    // introduced in v2
+    private static final String PARTITION_NAMES = "partition_names";
+    private static final int VERSION_1 = 1;
+    private static final int VERSION_2 = 2;
+    private static final int CURRENT_VERSION = VERSION_2;
+
+    @Override
+    public void serialize(LakeTableSnapshot lakeTableSnapshot, JsonGenerator 
generator)
+            throws IOException {
+        generator.writeStartObject();
+        generator.writeNumberField(VERSION_KEY, CURRENT_VERSION);
+        generator.writeNumberField(SNAPSHOT_ID, 
lakeTableSnapshot.getSnapshotId());
+        generator.writeNumberField(TABLE_ID, lakeTableSnapshot.getTableId());
+
+        // Extract partition names to top-level map to avoid duplication
+        Map<Long, String> partitionNameIdByPartitionId =
+                lakeTableSnapshot.getPartitionNameIdByPartitionId();
+        if (!partitionNameIdByPartitionId.isEmpty()) {
+            generator.writeObjectFieldStart(PARTITION_NAMES);
+            for (Map.Entry<Long, String> entry : 
partitionNameIdByPartitionId.entrySet()) {
+                generator.writeStringField(String.valueOf(entry.getKey()), 
entry.getValue());
+            }
+            generator.writeEndObject();
+        }
+
+        // Group buckets by partition_id to avoid repeating partition_id in 
each bucket
+        Map<Long, List<TableBucket>> partitionBuckets = new HashMap<>();
+        List<TableBucket> nonPartitionBuckets = new ArrayList<>();
+
+        for (TableBucket tableBucket : 
lakeTableSnapshot.getBucketLogEndOffset().keySet()) {
+            if (tableBucket.getPartitionId() != null) {
+                partitionBuckets
+                        .computeIfAbsent(tableBucket.getPartitionId(), k -> 
new ArrayList<>())
+                        .add(tableBucket);
+            } else {
+                nonPartitionBuckets.add(tableBucket);
+            }
+        }
+
+        // Serialize buckets: use array for non-partition buckets, object for 
partition buckets
+        if (!nonPartitionBuckets.isEmpty() || !partitionBuckets.isEmpty()) {
+            if (!partitionBuckets.isEmpty()) {
+                generator.writeObjectFieldStart(BUCKETS);
+                for (Map.Entry<Long, java.util.List<TableBucket>> entry :
+                        partitionBuckets.entrySet()) {
+                    // Partition table:  grouped by partition_id, first write 
partition
+                    
generator.writeArrayFieldStart(String.valueOf(entry.getKey()));
+                    for (TableBucket tableBucket : entry.getValue()) {
+                        // write bucket
+                        writeBucketObject(generator, lakeTableSnapshot, 
tableBucket);
+                    }
+                    generator.writeEndArray();
+                }
+                generator.writeEndObject();
+            } else {
+                // Non-partition table: use array format directly
+                generator.writeArrayFieldStart(BUCKETS);
+                for (TableBucket tableBucket : nonPartitionBuckets) {
+                    writeBucketObject(generator, lakeTableSnapshot, 
tableBucket);
+                }
+                generator.writeEndArray();
+            }
+        }
+
+        generator.writeEndObject();
+    }
+
+    /** Helper method to write a bucket object. */
+    private void writeBucketObject(
+            JsonGenerator generator, LakeTableSnapshot lakeTableSnapshot, 
TableBucket tableBucket)
+            throws IOException {
+        generator.writeStartObject();
+
+        generator.writeNumberField(BUCKET_ID, tableBucket.getBucket());
+
+        // Only include non-null values
+        if (lakeTableSnapshot.getLogStartOffset(tableBucket).isPresent()) {
+            generator.writeNumberField(
+                    LOG_START_OFFSET, 
lakeTableSnapshot.getLogStartOffset(tableBucket).get());
+        }
+
+        if (lakeTableSnapshot.getLogEndOffset(tableBucket).isPresent()) {
+            generator.writeNumberField(
+                    LOG_END_OFFSET, 
lakeTableSnapshot.getLogEndOffset(tableBucket).get());
+        }
+
+        if (lakeTableSnapshot.getMaxTimestamp(tableBucket).isPresent()) {
+            generator.writeNumberField(
+                    MAX_TIMESTAMP, 
lakeTableSnapshot.getMaxTimestamp(tableBucket).get());
+        }
+
+        generator.writeEndObject();
+    }
+
+    @Override
+    public LakeTableSnapshot deserialize(JsonNode node) {
+        int version = node.get(VERSION_KEY).asInt();
+        if (version == VERSION_1) {
+            return deserializeVersion1(node);
+        } else if (version == VERSION_2) {
+            return deserializeVersion2(node);
+        } else {
+            throw new IllegalArgumentException("Unsupported version: " + 
version);
+        }
+    }
+
+    /** Deserialize Version 1 format (legacy). */
+    private LakeTableSnapshot deserializeVersion1(JsonNode node) {
+        long snapshotId = node.get(SNAPSHOT_ID).asLong();
+        long tableId = node.get(TABLE_ID).asLong();
+        Iterator<JsonNode> buckets = node.get(BUCKETS).elements();
+        Map<TableBucket, Long> bucketLogStartOffset = new HashMap<>();
+        Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+        Map<TableBucket, Long> bucketMaxTimestamp = new HashMap<>();
+        Map<Long, String> partitionNameIdByPartitionId = new HashMap<>();
+        while (buckets.hasNext()) {
+            JsonNode bucket = buckets.next();
+            TableBucket tableBucket;
+            Long partitionId =
+                    bucket.get(PARTITION_ID) != null ? 
bucket.get(PARTITION_ID).asLong() : null;
+            tableBucket = new TableBucket(tableId, partitionId, 
bucket.get(BUCKET_ID).asInt());
+
+            if (bucket.get(LOG_START_OFFSET) != null) {
+                bucketLogStartOffset.put(tableBucket, 
bucket.get(LOG_START_OFFSET).asLong());
+            } else {
+                bucketLogStartOffset.put(tableBucket, null);
+            }
+
+            if (bucket.get(LOG_END_OFFSET) != null) {
+                bucketLogEndOffset.put(tableBucket, 
bucket.get(LOG_END_OFFSET).asLong());
+            } else {
+                bucketLogEndOffset.put(tableBucket, null);
+            }
+
+            if (bucket.get(MAX_TIMESTAMP) != null) {
+                bucketMaxTimestamp.put(tableBucket, 
bucket.get(MAX_TIMESTAMP).asLong());
+            } else {
+                bucketMaxTimestamp.put(tableBucket, null);
+            }
+
+            if (partitionId != null && bucket.get(PARTITION_NAME) != null) {
+                partitionNameIdByPartitionId.put(
+                        tableBucket.getPartitionId(), 
bucket.get(PARTITION_NAME).asText());
+            }
+        }
+        return new LakeTableSnapshot(
+                snapshotId,
+                tableId,
+                bucketLogStartOffset,
+                bucketLogEndOffset,
+                bucketMaxTimestamp,
+                partitionNameIdByPartitionId);
+    }
+
+    /** Deserialize Version 2 format (compact layout). */
+    private LakeTableSnapshot deserializeVersion2(JsonNode node) {
+        long snapshotId = node.get(SNAPSHOT_ID).asLong();
+        long tableId = node.get(TABLE_ID).asLong();
+
+        // Load partition names from top-level map
+        Map<Long, String> partitionNameIdByPartitionId = new HashMap<>();
+        if (node.has(PARTITION_NAMES) && node.get(PARTITION_NAMES) != null) {
+            JsonNode partitionsNode = node.get(PARTITION_NAMES);
+            Iterator<Map.Entry<String, JsonNode>> partitions = 
partitionsNode.fields();
+            while (partitions.hasNext()) {
+                Map.Entry<String, JsonNode> entry = partitions.next();
+                Long partitionId = Long.parseLong(entry.getKey());
+                String partitionName = entry.getValue().asText();
+                partitionNameIdByPartitionId.put(partitionId, partitionName);
+            }
+        }
+
+        Map<TableBucket, Long> bucketLogStartOffset = new HashMap<>();
+        Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+        Map<TableBucket, Long> bucketMaxTimestamp = new HashMap<>();
+
+        // Deserialize buckets: array format for non-partition table, object 
format for partition
+        // table
+        if (node.has(BUCKETS) && node.get(BUCKETS) != null) {
+            JsonNode bucketsNode = node.get(BUCKETS);
+            if (bucketsNode.isArray()) {
+                // Non-partition table: array format
+                Iterator<JsonNode> buckets = bucketsNode.elements();
+                while (buckets.hasNext()) {
+                    JsonNode bucket = buckets.next();
+                    TableBucket tableBucket =
+                            new TableBucket(tableId, 
bucket.get(BUCKET_ID).asInt());
+                    readBucketFields(
+                            bucket,
+                            tableBucket,
+                            bucketLogStartOffset,
+                            bucketLogEndOffset,
+                            bucketMaxTimestamp);
+                }
+            } else {
+                // Partition table: object format grouped by partition_id
+                Iterator<Map.Entry<String, JsonNode>> partitions = 
bucketsNode.fields();
+                while (partitions.hasNext()) {
+                    Map.Entry<String, JsonNode> entry = partitions.next();
+                    String partitionKey = entry.getKey();
+                    Long actualPartitionId = Long.parseLong(partitionKey);

Review Comment:
   Potential uncaught 'java.lang.NumberFormatException'.
   ```suggestion
                       Long actualPartitionId = null;
                       try {
                           actualPartitionId = Long.parseLong(partitionKey);
                       } catch (NumberFormatException e) {
                           System.err.println("Warning: Invalid partition key 
'" + partitionKey + "' encountered during deserialization. Skipping this 
partition.");
                           continue;
                       }
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lake;
+
+import org.apache.fluss.fs.FSDataOutputStream;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.utils.FlussPaths;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.fluss.metrics.registry.MetricRegistry.LOG;
+
+/** The helper to handle {@link LakeTable}. */
+public class LakeTableHelper {
+
+    private final ZooKeeperClient zkClient;
+    private final String remoteDataDir;
+
+    public LakeTableHelper(ZooKeeperClient zkClient, String remoteDataDir) {
+        this.zkClient = zkClient;
+        this.remoteDataDir = remoteDataDir;
+    }
+
+    /**
+     * Upserts a lake table snapshot for the given table.
+     *
+     * <p>This method merges the new snapshot with the existing one (if any) 
and stores it(data in
+     * remote file, the remote file path in ZK).
+     *
+     * @param tableId the table ID
+     * @param tablePath the table path
+     * @param lakeTableSnapshot the new snapshot to upsert
+     * @throws Exception if the operation fails
+     */
+    public void upsertLakeTable(
+            long tableId, TablePath tablePath, LakeTableSnapshot 
lakeTableSnapshot)
+            throws Exception {
+        Optional<LakeTable> optPreviousLakeTable = 
zkClient.getLakeTable(tableId);
+        // Merge with previous snapshot if exists
+        if (optPreviousLakeTable.isPresent()) {
+            lakeTableSnapshot =
+                    mergeLakeTable(
+                            optPreviousLakeTable.get().toLakeTableSnapshot(), 
lakeTableSnapshot);
+        }
+
+        FsPath lakeTableSnapshotFsPath =
+                storeLakeTableSnapshot(tableId, tablePath, lakeTableSnapshot);
+        LakeTable lakeTable = new LakeTable(lakeTableSnapshotFsPath);
+        try {
+            zkClient.upsertLakeTable(tableId, lakeTable, 
optPreviousLakeTable.isPresent());
+        } catch (Exception e) {
+            LOG.warn("Failed to upsert lake table snapshot to zk.", e);
+            // delete the new lake table snapshot file
+            deleteFile(lakeTableSnapshotFsPath);
+            throw e;
+        }
+
+        if (optPreviousLakeTable.isPresent()) {
+            FsPath previousLakeSnapshotFsPath =
+                    
optPreviousLakeTable.get().getLakeTableLatestSnapshotFileHandle();
+            if (previousLakeSnapshotFsPath != null) {
+                deleteFile(previousLakeSnapshotFsPath);
+            }
+        }
+    }
+
+    private LakeTableSnapshot mergeLakeTable(
+            LakeTableSnapshot previousLakeTableSnapshot,
+            LakeTableSnapshot newLakeTableTableSnapshot) {

Review Comment:
   The parameter name `newLakeTableTableSnapshot` contains a typo with 
duplicated "Table". It should be `newLakeTableSnapshot` for consistency and 
clarity.



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lake;
+
+import org.apache.fluss.fs.FSDataOutputStream;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.utils.FlussPaths;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.fluss.metrics.registry.MetricRegistry.LOG;
+
+/** The helper to handle {@link LakeTable}. */
+public class LakeTableHelper {
+
+    private final ZooKeeperClient zkClient;
+    private final String remoteDataDir;
+
+    public LakeTableHelper(ZooKeeperClient zkClient, String remoteDataDir) {
+        this.zkClient = zkClient;
+        this.remoteDataDir = remoteDataDir;
+    }
+
+    /**
+     * Upserts a lake table snapshot for the given table.
+     *
+     * <p>This method merges the new snapshot with the existing one (if any) 
and stores it(data in

Review Comment:
   Minor grammar issue in the JavaDoc. Change "stores it(data in" to "stores it 
(data in" - add a space before the opening parenthesis for proper formatting.
   ```suggestion
        * <p>This method merges the new snapshot with the existing one (if any) 
and stores it (data in
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java:
##########
@@ -407,6 +399,11 @@ CompletableFuture<Void> stopServices() {
                 exception = ExceptionUtils.firstOrSuppressed(t, exception);
             }
 
+            if (ioExecutor != null) {
+                // shutdown io executor
+                ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, 
ioExecutor);

Review Comment:
   The `ioExecutor` shutdown should be wrapped in a try-catch block to handle 
potential exceptions, similar to other resource cleanup operations in this 
method. If an exception is thrown during executor shutdown, it could prevent 
subsequent cleanup operations from executing.
   
   ```java
   try {
       if (ioExecutor != null) {
           // shutdown io executor
           ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, ioExecutor);
       }
   } catch (Throwable t) {
       exception = ExceptionUtils.firstOrSuppressed(t, exception);
   }
   ```
   ```suggestion
               try {
                   if (ioExecutor != null) {
                       // shutdown io executor
                       ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, 
ioExecutor);
                   }
               } catch (Throwable t) {
                   exception = ExceptionUtils.firstOrSuppressed(t, exception);
   ```



##########
fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableHelperTest.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lake;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.zk.NOPErrorHandler;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.ZooKeeperExtension;
+import org.apache.fluss.server.zk.ZooKeeperUtils;
+import org.apache.fluss.server.zk.data.TableRegistration;
+import org.apache.fluss.server.zk.data.ZkData;
+import org.apache.fluss.testutils.common.AllCallbackWrapper;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The UT for {@link LakeTableHelper}. */
+class LakeTableHelperTest {
+
+    @RegisterExtension
+    public static final AllCallbackWrapper<ZooKeeperExtension> 
ZOO_KEEPER_EXTENSION_WRAPPER =
+            new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+    private static ZooKeeperClient zookeeperClient;
+
+    @BeforeAll
+    static void beforeAll() {
+        zookeeperClient =
+                ZOO_KEEPER_EXTENSION_WRAPPER
+                        .getCustomExtension()
+                        .getZooKeeperClient(NOPErrorHandler.INSTANCE);
+    }
+
+    @AfterEach
+    void afterEach() {
+        ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupRoot();
+    }
+
+    @AfterAll
+    static void afterAll() {
+        zookeeperClient.close();
+    }
+
+    @Test
+    void testUpsertLakeTableCompatible(@TempDir Path tempDir) throws Exception 
{
+        // Create a ZooKeeperClient with REMOTE_DATA_DIR configuration
+        Configuration conf = new Configuration();
+        conf.setString(
+                ConfigOptions.ZOOKEEPER_ADDRESS,
+                
ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().getConnectString());
+        LakeTableHelper lakeTableHelper = new LakeTableHelper(zookeeperClient, 
tempDir.toString());
+        try (ZooKeeperClient zooKeeperClient =
+                ZooKeeperUtils.startZookeeperClient(conf, 
NOPErrorHandler.INSTANCE)) {
+            // first create a table
+            long tableId = 1;
+            TablePath tablePath = TablePath.of("test_db", "test_table");
+            TableRegistration tableReg =
+                    new TableRegistration(
+                            tableId,
+                            "test table",
+                            Collections.emptyList(),
+                            new TableDescriptor.TableDistribution(
+                                    1, Collections.singletonList("a")),
+                            Collections.emptyMap(),
+                            Collections.emptyMap(),
+                            System.currentTimeMillis(),
+                            System.currentTimeMillis());
+            zookeeperClient.registerTable(tablePath, tableReg);
+
+            // Create a legacy version 1 LakeTableSnapshot (full data in ZK)
+            long snapshotId = 1L;
+            Map<TableBucket, Long> bucketLogStartOffset = new HashMap<>();
+            bucketLogStartOffset.put(new TableBucket(tableId, 0), 10L);
+            bucketLogStartOffset.put(new TableBucket(tableId, 1), 20L);
+
+            Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+            bucketLogEndOffset.put(new TableBucket(tableId, 0), 100L);
+            bucketLogEndOffset.put(new TableBucket(tableId, 1), 200L);
+
+            Map<TableBucket, Long> bucketMaxTimestamp = new HashMap<>();
+            bucketMaxTimestamp.put(new TableBucket(tableId, 0), 1000L);
+            bucketMaxTimestamp.put(new TableBucket(tableId, 1), 2000L);
+
+            Map<Long, String> partitionNameIdByPartitionId = new HashMap<>();
+            LakeTableSnapshot lakeTableSnapshot =
+                    new LakeTableSnapshot(
+                            snapshotId,
+                            tableId,
+                            bucketLogStartOffset,
+                            bucketLogEndOffset,
+                            bucketMaxTimestamp,
+                            partitionNameIdByPartitionId);
+            // Write version 1 format data directly to ZK (simulating old 
system behavior)
+            String zkPath = ZkData.LakeTableZNode.path(tableId);
+            byte[] version1Data = 
LakeTableSnapshotJsonSerde.toJsonVersion1(lakeTableSnapshot);
+            zooKeeperClient
+                    .getCuratorClient()
+                    .create()
+                    .creatingParentsIfNeeded()
+                    .forPath(zkPath, version1Data);
+
+            // Verify version 1 data can be read
+            Optional<LakeTable> optionalLakeTable = 
zooKeeperClient.getLakeTable(tableId);
+            assertThat(optionalLakeTable).isPresent();
+            LakeTable lakeTable = optionalLakeTable.get();
+            
assertThat(lakeTable.toLakeTableSnapshot()).isEqualTo(lakeTableSnapshot);
+
+            // Test: Call upsertLakeTableSnapshot with new snapshot data
+            // This should read the old version 1 data, merge it, and write as 
version 2
+            Map<TableBucket, Long> newBucketLogEndOffset = new HashMap<>();
+            newBucketLogEndOffset.put(new TableBucket(tableId, 0), 1500L); // 
Updated offset
+            newBucketLogEndOffset.put(new TableBucket(tableId, 1), 2000L); // 
new offset
+
+            long snapshot2Id = 2L;
+            LakeTableSnapshot snapshot2 =
+                    new LakeTableSnapshot(
+                            snapshot2Id,
+                            tableId,
+                            Collections.emptyMap(),
+                            newBucketLogEndOffset,
+                            Collections.emptyMap(),
+                            Collections.emptyMap());
+            lakeTableHelper.upsertLakeTable(tableId, tablePath, snapshot2);
+
+            // Verify: New version 2 data can be read
+            Optional<LakeTable> optLakeTableAfter = 
zooKeeperClient.getLakeTable(tableId);
+            assertThat(optLakeTableAfter).isPresent();
+            LakeTable lakeTableAfter = optLakeTableAfter.get();
+            assertThat(lakeTableAfter.getLakeTableLatestSnapshotFileHandle())
+                    .isNotNull(); // Version 2 has file path
+
+            // Verify: The lake snapshot file exists
+            FsPath snapshot2FileHandle = 
lakeTableAfter.getLakeTableLatestSnapshotFileHandle();
+            FileSystem fileSystem = snapshot2FileHandle.getFileSystem();
+            assertThat(fileSystem.exists(snapshot2FileHandle)).isTrue();
+
+            Optional<LakeTableSnapshot> optMergedSnapshot =
+                    zooKeeperClient.getLakeTableSnapshot(tableId);
+            assertThat(optMergedSnapshot).isPresent();
+            LakeTableSnapshot mergedSnapshot = optMergedSnapshot.get();
+
+            // verify the snapshot should merge previous snapshot
+            assertThat(mergedSnapshot.getSnapshotId()).isEqualTo(snapshot2Id);
+            assertThat(mergedSnapshot.getTableId()).isEqualTo(tableId);
+            
assertThat(mergedSnapshot.getBucketLogStartOffset()).isEqualTo(bucketLogStartOffset);
+
+            Map<TableBucket, Long> expectedBucketLogEndOffset = new 
HashMap<>(bucketLogEndOffset);
+            expectedBucketLogEndOffset.putAll(newBucketLogEndOffset);
+            assertThat(mergedSnapshot.getBucketLogEndOffset())
+                    .isEqualTo(expectedBucketLogEndOffset);
+            
assertThat(mergedSnapshot.getBucketMaxTimestamp()).isEqualTo(bucketMaxTimestamp);
+            assertThat(mergedSnapshot.getPartitionNameIdByPartitionId())
+                    .isEqualTo(partitionNameIdByPartitionId);
+
+            // add a new snapshot 3 again, verify snapshot
+            long snapshot3Id = 2L;

Review Comment:
   The snapshot ID should be `3L` instead of `2L`. This is snapshot 3, and 
using the same ID as snapshot 2 would be incorrect and could lead to confusion 
or incorrect behavior in the test.
   ```suggestion
               long snapshot3Id = 3L;
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableSnapshotJsonSerde.java:
##########
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lake;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.metadata.TableBucket;
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.fluss.utils.json.JsonDeserializer;
+import org.apache.fluss.utils.json.JsonSerdeUtils;
+import org.apache.fluss.utils.json.JsonSerializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Json serializer and deserializer for {@link LakeTableSnapshot}.
+ *
+ * <p>This serde supports two storage format versions:
+ *
+ * <ul>
+ *   <li>Version 1 (legacy): Each bucket object contains full information 
including repeated
+ *       partition names and partition_id in each bucket entry.
+ *   <li>Version 2 (current): Compact format that optimizes layout to avoid 
duplication:
+ *       <ul>
+ *         <li>Extracts partition names to a top-level "partition_names" map
+ *         <li>Groups buckets by partition_id in "buckets" to avoid repeating 
partition_id in each
+ *             bucket
+ *         <li>Non-partition table uses array format: "buckets": [...]
+ *         <li>Partition table uses object format: "buckets": {"1": [...], 
"2": [...]}, "1", "2" is
+ *             for the partition id
+ *       </ul>
+ *       Field names remain the same as Version 1, only the layout is 
optimized.
+ * </ul>
+ */
+public class LakeTableSnapshotJsonSerde
+        implements JsonSerializer<LakeTableSnapshot>, 
JsonDeserializer<LakeTableSnapshot> {
+
+    public static final LakeTableSnapshotJsonSerde INSTANCE = new 
LakeTableSnapshotJsonSerde();
+
+    private static final String VERSION_KEY = "version";
+
+    private static final String SNAPSHOT_ID = "snapshot_id";
+    private static final String TABLE_ID = "table_id";
+    private static final String PARTITION_ID = "partition_id";
+    private static final String BUCKETS = "buckets";
+    private static final String BUCKET_ID = "bucket_id";
+    private static final String LOG_START_OFFSET = "log_start_offset";
+    private static final String LOG_END_OFFSET = "log_end_offset";
+    private static final String MAX_TIMESTAMP = "max_timestamp";
+    private static final String PARTITION_NAME = "partition_name";
+
+    // introduced in v2
+    private static final String PARTITION_NAMES = "partition_names";
+    private static final int VERSION_1 = 1;
+    private static final int VERSION_2 = 2;
+    private static final int CURRENT_VERSION = VERSION_2;
+
+    @Override
+    public void serialize(LakeTableSnapshot lakeTableSnapshot, JsonGenerator 
generator)
+            throws IOException {
+        generator.writeStartObject();
+        generator.writeNumberField(VERSION_KEY, CURRENT_VERSION);
+        generator.writeNumberField(SNAPSHOT_ID, 
lakeTableSnapshot.getSnapshotId());
+        generator.writeNumberField(TABLE_ID, lakeTableSnapshot.getTableId());
+
+        // Extract partition names to top-level map to avoid duplication
+        Map<Long, String> partitionNameIdByPartitionId =
+                lakeTableSnapshot.getPartitionNameIdByPartitionId();
+        if (!partitionNameIdByPartitionId.isEmpty()) {
+            generator.writeObjectFieldStart(PARTITION_NAMES);
+            for (Map.Entry<Long, String> entry : 
partitionNameIdByPartitionId.entrySet()) {
+                generator.writeStringField(String.valueOf(entry.getKey()), 
entry.getValue());
+            }
+            generator.writeEndObject();
+        }
+
+        // Group buckets by partition_id to avoid repeating partition_id in 
each bucket
+        Map<Long, List<TableBucket>> partitionBuckets = new HashMap<>();
+        List<TableBucket> nonPartitionBuckets = new ArrayList<>();
+
+        for (TableBucket tableBucket : 
lakeTableSnapshot.getBucketLogEndOffset().keySet()) {
+            if (tableBucket.getPartitionId() != null) {
+                partitionBuckets
+                        .computeIfAbsent(tableBucket.getPartitionId(), k -> 
new ArrayList<>())
+                        .add(tableBucket);
+            } else {
+                nonPartitionBuckets.add(tableBucket);
+            }
+        }
+
+        // Serialize buckets: use array for non-partition buckets, object for 
partition buckets
+        if (!nonPartitionBuckets.isEmpty() || !partitionBuckets.isEmpty()) {
+            if (!partitionBuckets.isEmpty()) {
+                generator.writeObjectFieldStart(BUCKETS);
+                for (Map.Entry<Long, java.util.List<TableBucket>> entry :
+                        partitionBuckets.entrySet()) {
+                    // Partition table:  grouped by partition_id, first write 
partition
+                    
generator.writeArrayFieldStart(String.valueOf(entry.getKey()));
+                    for (TableBucket tableBucket : entry.getValue()) {
+                        // write bucket
+                        writeBucketObject(generator, lakeTableSnapshot, 
tableBucket);
+                    }
+                    generator.writeEndArray();
+                }
+                generator.writeEndObject();
+            } else {
+                // Non-partition table: use array format directly
+                generator.writeArrayFieldStart(BUCKETS);
+                for (TableBucket tableBucket : nonPartitionBuckets) {
+                    writeBucketObject(generator, lakeTableSnapshot, 
tableBucket);
+                }
+                generator.writeEndArray();
+            }
+        }
+
+        generator.writeEndObject();
+    }
+
+    /** Helper method to write a bucket object. */
+    private void writeBucketObject(
+            JsonGenerator generator, LakeTableSnapshot lakeTableSnapshot, 
TableBucket tableBucket)
+            throws IOException {
+        generator.writeStartObject();
+
+        generator.writeNumberField(BUCKET_ID, tableBucket.getBucket());
+
+        // Only include non-null values
+        if (lakeTableSnapshot.getLogStartOffset(tableBucket).isPresent()) {
+            generator.writeNumberField(
+                    LOG_START_OFFSET, 
lakeTableSnapshot.getLogStartOffset(tableBucket).get());
+        }
+
+        if (lakeTableSnapshot.getLogEndOffset(tableBucket).isPresent()) {
+            generator.writeNumberField(
+                    LOG_END_OFFSET, 
lakeTableSnapshot.getLogEndOffset(tableBucket).get());
+        }
+
+        if (lakeTableSnapshot.getMaxTimestamp(tableBucket).isPresent()) {
+            generator.writeNumberField(
+                    MAX_TIMESTAMP, 
lakeTableSnapshot.getMaxTimestamp(tableBucket).get());
+        }
+
+        generator.writeEndObject();
+    }
+
+    @Override
+    public LakeTableSnapshot deserialize(JsonNode node) {
+        int version = node.get(VERSION_KEY).asInt();
+        if (version == VERSION_1) {
+            return deserializeVersion1(node);
+        } else if (version == VERSION_2) {
+            return deserializeVersion2(node);
+        } else {
+            throw new IllegalArgumentException("Unsupported version: " + 
version);
+        }
+    }
+
+    /** Deserialize Version 1 format (legacy). */
+    private LakeTableSnapshot deserializeVersion1(JsonNode node) {
+        long snapshotId = node.get(SNAPSHOT_ID).asLong();
+        long tableId = node.get(TABLE_ID).asLong();
+        Iterator<JsonNode> buckets = node.get(BUCKETS).elements();
+        Map<TableBucket, Long> bucketLogStartOffset = new HashMap<>();
+        Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+        Map<TableBucket, Long> bucketMaxTimestamp = new HashMap<>();
+        Map<Long, String> partitionNameIdByPartitionId = new HashMap<>();
+        while (buckets.hasNext()) {
+            JsonNode bucket = buckets.next();
+            TableBucket tableBucket;
+            Long partitionId =
+                    bucket.get(PARTITION_ID) != null ? 
bucket.get(PARTITION_ID).asLong() : null;
+            tableBucket = new TableBucket(tableId, partitionId, 
bucket.get(BUCKET_ID).asInt());
+
+            if (bucket.get(LOG_START_OFFSET) != null) {
+                bucketLogStartOffset.put(tableBucket, 
bucket.get(LOG_START_OFFSET).asLong());
+            } else {
+                bucketLogStartOffset.put(tableBucket, null);
+            }
+
+            if (bucket.get(LOG_END_OFFSET) != null) {
+                bucketLogEndOffset.put(tableBucket, 
bucket.get(LOG_END_OFFSET).asLong());
+            } else {
+                bucketLogEndOffset.put(tableBucket, null);
+            }
+
+            if (bucket.get(MAX_TIMESTAMP) != null) {
+                bucketMaxTimestamp.put(tableBucket, 
bucket.get(MAX_TIMESTAMP).asLong());
+            } else {
+                bucketMaxTimestamp.put(tableBucket, null);
+            }
+
+            if (partitionId != null && bucket.get(PARTITION_NAME) != null) {
+                partitionNameIdByPartitionId.put(
+                        tableBucket.getPartitionId(), 
bucket.get(PARTITION_NAME).asText());
+            }
+        }
+        return new LakeTableSnapshot(
+                snapshotId,
+                tableId,
+                bucketLogStartOffset,
+                bucketLogEndOffset,
+                bucketMaxTimestamp,
+                partitionNameIdByPartitionId);
+    }
+
+    /** Deserialize Version 2 format (compact layout). */
+    private LakeTableSnapshot deserializeVersion2(JsonNode node) {
+        long snapshotId = node.get(SNAPSHOT_ID).asLong();
+        long tableId = node.get(TABLE_ID).asLong();
+
+        // Load partition names from top-level map
+        Map<Long, String> partitionNameIdByPartitionId = new HashMap<>();
+        if (node.has(PARTITION_NAMES) && node.get(PARTITION_NAMES) != null) {
+            JsonNode partitionsNode = node.get(PARTITION_NAMES);
+            Iterator<Map.Entry<String, JsonNode>> partitions = 
partitionsNode.fields();
+            while (partitions.hasNext()) {
+                Map.Entry<String, JsonNode> entry = partitions.next();
+                Long partitionId = Long.parseLong(entry.getKey());

Review Comment:
   Potential uncaught 'java.lang.NumberFormatException'.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to