Copilot commented on code in PR #2037:
URL: https://github.com/apache/fluss/pull/2037#discussion_r2600833598


##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableSnapshotJsonSerde.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lake;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.metadata.TableBucket;
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.fluss.utils.json.JsonDeserializer;
+import org.apache.fluss.utils.json.JsonSerdeUtils;
+import org.apache.fluss.utils.json.JsonSerializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * Json serializer and deserializer for {@link LakeTableSnapshot}.
+ *
+ * <p>This serde supports two storage format versions:
+ *
+ * <ul>
+ *   <li>Version 1 (legacy): Each bucket object contains full information 
including repeated
+ *       partition names and partition_id in each bucket entry.
+ *   <li>Version 2 (current): Compact format that optimizes layout:
+ *       <ul>
+ *         <li>Non-partition table uses array format: "buckets": [100, 200, 
300], where array index
+ *             represents bucket id (0, 1, 2) and value represents 
log_end_offset. For buckets
+ *             without endoffset, -1 is written. Missing bucket ids in the 
sequence are also filled
+ *             with -1.
+ *         <li>Partition table uses object format: "buckets": {"1": [100, 
200], "2": [300, 400]},
+ *             where key is partition id, array index represents bucket id (0, 
1) and value
+ *             represents log_end_offset. For buckets without endoffset, -1 is 
written. Missing

Review Comment:
   Typo in comment: "endoffset" should be "end offset" (two words), occurs 
twice in this line.



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lake;
+
+import org.apache.fluss.fs.FSDataOutputStream;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.utils.FlussPaths;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.fluss.metrics.registry.MetricRegistry.LOG;
+
+/** The helper to handle {@link LakeTable}. */
+public class LakeTableHelper {
+
+    private final ZooKeeperClient zkClient;
+    private final String remoteDataDir;
+
+    public LakeTableHelper(ZooKeeperClient zkClient, String remoteDataDir) {
+        this.zkClient = zkClient;
+        this.remoteDataDir = remoteDataDir;
+    }
+
+    /**
+     * Upserts a lake table snapshot for the given table.
+     *
+     * <p>This method merges the new snapshot with the existing one (if any) 
and stores it (data in
+     * remote file, the remote file path in ZK).
+     *
+     * @param tableId the table ID
+     * @param tablePath the table path
+     * @param lakeTableSnapshot the new snapshot to upsert
+     * @throws Exception if the operation fails
+     */
+    public void upsertLakeTable(
+            long tableId, TablePath tablePath, LakeTableSnapshot 
lakeTableSnapshot)
+            throws Exception {
+        Optional<LakeTable> optPreviousLakeTable = 
zkClient.getLakeTable(tableId);
+        // Merge with previous snapshot if exists
+        if (optPreviousLakeTable.isPresent()) {
+            lakeTableSnapshot =
+                    mergeLakeTable(
+                            
optPreviousLakeTable.get().getLatestTableSnapshot(), lakeTableSnapshot);
+        }
+
+        // store the lake table snapshot into a file
+        FsPath lakeTableSnapshotFsPath =
+                storeLakeTableSnapshot(tableId, tablePath, lakeTableSnapshot);
+
+        LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata =
+                new LakeTable.LakeSnapshotMetadata(
+                        lakeTableSnapshot.getSnapshotId(),
+                        // use the lake table snapshot file as the tiered 
offsets file since
+                        // the the table snapshot file will contain the tiered 
log end offsets

Review Comment:
   Typo in comment: "the the table" should be "the table" (duplicate word).



##########
fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableHelperTest.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lake;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.zk.NOPErrorHandler;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.ZooKeeperExtension;
+import org.apache.fluss.server.zk.ZooKeeperUtils;
+import org.apache.fluss.server.zk.data.TableRegistration;
+import org.apache.fluss.server.zk.data.ZkData;
+import org.apache.fluss.testutils.common.AllCallbackWrapper;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The UT for {@link LakeTableHelper}. */
+class LakeTableHelperTest {
+
+    @RegisterExtension
+    public static final AllCallbackWrapper<ZooKeeperExtension> 
ZOO_KEEPER_EXTENSION_WRAPPER =
+            new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+    private static ZooKeeperClient zookeeperClient;
+
+    @BeforeAll
+    static void beforeAll() {
+        zookeeperClient =
+                ZOO_KEEPER_EXTENSION_WRAPPER
+                        .getCustomExtension()
+                        .getZooKeeperClient(NOPErrorHandler.INSTANCE);
+    }
+
+    @AfterEach
+    void afterEach() {
+        ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupRoot();
+    }
+
+    @AfterAll
+    static void afterAll() {
+        zookeeperClient.close();
+    }
+
+    @Test
+    void testUpsertLakeTableCompatible(@TempDir Path tempDir) throws Exception 
{
+        // Create a ZooKeeperClient with REMOTE_DATA_DIR configuration
+        Configuration conf = new Configuration();
+        conf.setString(
+                ConfigOptions.ZOOKEEPER_ADDRESS,
+                
ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().getConnectString());
+        LakeTableHelper lakeTableHelper = new LakeTableHelper(zookeeperClient, 
tempDir.toString());
+        try (ZooKeeperClient zooKeeperClient =
+                ZooKeeperUtils.startZookeeperClient(conf, 
NOPErrorHandler.INSTANCE)) {
+            // first create a table
+            long tableId = 1;
+            TablePath tablePath = TablePath.of("test_db", "test_table");
+            TableRegistration tableReg =
+                    new TableRegistration(
+                            tableId,
+                            "test table",
+                            Collections.emptyList(),
+                            new TableDescriptor.TableDistribution(
+                                    1, Collections.singletonList("a")),
+                            Collections.emptyMap(),
+                            Collections.emptyMap(),
+                            System.currentTimeMillis(),
+                            System.currentTimeMillis());
+            zookeeperClient.registerTable(tablePath, tableReg);
+
+            // Create a legacy version 1 LakeTableSnapshot (full data in ZK)
+            long snapshotId = 1L;
+
+            Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+            bucketLogEndOffset.put(new TableBucket(tableId, 0), 100L);
+            bucketLogEndOffset.put(new TableBucket(tableId, 1), 200L);
+
+            LakeTableSnapshot lakeTableSnapshot =
+                    new LakeTableSnapshot(snapshotId, bucketLogEndOffset);
+            // Write version 1 format data directly to ZK (simulating old 
system behavior)
+            String zkPath = ZkData.LakeTableZNode.path(tableId);
+            byte[] version1Data =
+                    
LakeTableSnapshotJsonSerde.toJsonVersion1(lakeTableSnapshot, tableId);
+            zooKeeperClient
+                    .getCuratorClient()
+                    .create()
+                    .creatingParentsIfNeeded()
+                    .forPath(zkPath, version1Data);
+
+            // Verify version 1 data can be read
+            Optional<LakeTable> optionalLakeTable = 
zooKeeperClient.getLakeTable(tableId);
+            assertThat(optionalLakeTable).isPresent();
+            LakeTable lakeTable = optionalLakeTable.get();
+            
assertThat(lakeTable.getLatestTableSnapshot()).isEqualTo(lakeTableSnapshot);
+
+            // Test: Call upsertLakeTableSnapshot with new snapshot data
+            // This should read the old version 1 data, merge it, and write as 
version 2
+            Map<TableBucket, Long> newBucketLogEndOffset = new HashMap<>();
+            newBucketLogEndOffset.put(new TableBucket(tableId, 0), 1500L); // 
Updated offset
+            newBucketLogEndOffset.put(new TableBucket(tableId, 1), 2000L); // 
new offset
+
+            long snapshot2Id = 2L;
+            LakeTableSnapshot snapshot2 = new LakeTableSnapshot(snapshot2Id, 
newBucketLogEndOffset);
+            lakeTableHelper.upsertLakeTable(tableId, tablePath, snapshot2);
+
+            // Verify: New version 2 data can be read
+            Optional<LakeTable> optLakeTableAfter = 
zooKeeperClient.getLakeTable(tableId);
+            assertThat(optLakeTableAfter).isPresent();
+            LakeTable lakeTableAfter = optLakeTableAfter.get();
+            assertThat(lakeTableAfter.getLakeTableLatestSnapshot())
+                    .isNotNull(); // Version 2 has file path
+
+            // Verify: The lake snapshot file exists
+            FsPath snapshot2FileHandle =
+                    
lakeTableAfter.getLakeTableLatestSnapshot().getReadableOffsetsFilePath();
+            FileSystem fileSystem = snapshot2FileHandle.getFileSystem();
+            assertThat(fileSystem.exists(snapshot2FileHandle)).isTrue();
+
+            Optional<LakeTableSnapshot> optMergedSnapshot =
+                    zooKeeperClient.getLakeTableSnapshot(tableId);
+            assertThat(optMergedSnapshot).isPresent();
+            LakeTableSnapshot mergedSnapshot = optMergedSnapshot.get();
+
+            // verify the snapshot should merge previous snapshot
+            assertThat(mergedSnapshot.getSnapshotId()).isEqualTo(snapshot2Id);
+            Map<TableBucket, Long> expectedBucketLogEndOffset = new 
HashMap<>(bucketLogEndOffset);
+            expectedBucketLogEndOffset.putAll(newBucketLogEndOffset);
+            assertThat(mergedSnapshot.getBucketLogEndOffset())
+                    .isEqualTo(expectedBucketLogEndOffset);
+
+            // add a new snapshot 3 again, verify snapshot
+            long snapshot3Id = 3L;
+            LakeTableSnapshot snapshot3 = new LakeTableSnapshot(snapshot3Id, 
newBucketLogEndOffset);
+            lakeTableHelper.upsertLakeTable(tableId, tablePath, snapshot3);
+            // verify snapshot 3 is discard

Review Comment:
   Typo in comment: "discard" should be "discarded" (past tense) to match the 
assertion that follows.



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lake;
+
+import org.apache.fluss.fs.FSDataOutputStream;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.utils.FlussPaths;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.fluss.metrics.registry.MetricRegistry.LOG;
+
+/** The helper to handle {@link LakeTable}. */
+public class LakeTableHelper {
+
+    private final ZooKeeperClient zkClient;
+    private final String remoteDataDir;
+
+    public LakeTableHelper(ZooKeeperClient zkClient, String remoteDataDir) {
+        this.zkClient = zkClient;
+        this.remoteDataDir = remoteDataDir;
+    }
+
+    /**
+     * Upserts a lake table snapshot for the given table.
+     *
+     * <p>This method merges the new snapshot with the existing one (if any) 
and stores it (data in
+     * remote file, the remote file path in ZK).
+     *
+     * @param tableId the table ID
+     * @param tablePath the table path
+     * @param lakeTableSnapshot the new snapshot to upsert
+     * @throws Exception if the operation fails
+     */
+    public void upsertLakeTable(
+            long tableId, TablePath tablePath, LakeTableSnapshot 
lakeTableSnapshot)
+            throws Exception {
+        Optional<LakeTable> optPreviousLakeTable = 
zkClient.getLakeTable(tableId);
+        // Merge with previous snapshot if exists
+        if (optPreviousLakeTable.isPresent()) {
+            lakeTableSnapshot =
+                    mergeLakeTable(
+                            
optPreviousLakeTable.get().getLatestTableSnapshot(), lakeTableSnapshot);
+        }
+
+        // store the lake table snapshot into a file
+        FsPath lakeTableSnapshotFsPath =
+                storeLakeTableSnapshot(tableId, tablePath, lakeTableSnapshot);
+
+        LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata =
+                new LakeTable.LakeSnapshotMetadata(
+                        lakeTableSnapshot.getSnapshotId(),
+                        // use the lake table snapshot file as the tiered 
offsets file since
+                        // the the table snapshot file will contain the tiered 
log end offsets
+                        lakeTableSnapshotFsPath,
+                        // currently, readableOffsetsFilePath is always same 
with
+                        // tieredOffsetsFilePath, but in the future we'll 
commit a readable offsets
+                        // separately to mark the what's the readable offsets 
for a snapshot since
+                        // in paimon dv table, tiered log end offsets is not 
same with readable
+                        // offsets
+                        lakeTableSnapshotFsPath);
+
+        // currently, we keep only one lake snapshot metadata in zk,
+        // todo: in solve paimon dv union read issue #2121, we'll keep 
multiple lake snapshot

Review Comment:
   Grammar issue: "in solve paimon dv union read issue" should be "to solve 
paimon dv union read issue" or "in solving paimon dv union read issue".



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableSnapshotJsonSerde.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lake;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.metadata.TableBucket;
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.fluss.utils.json.JsonDeserializer;
+import org.apache.fluss.utils.json.JsonSerdeUtils;
+import org.apache.fluss.utils.json.JsonSerializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * Json serializer and deserializer for {@link LakeTableSnapshot}.
+ *
+ * <p>This serde supports two storage format versions:
+ *
+ * <ul>
+ *   <li>Version 1 (legacy): Each bucket object contains full information 
including repeated
+ *       partition names and partition_id in each bucket entry.
+ *   <li>Version 2 (current): Compact format that optimizes layout:
+ *       <ul>
+ *         <li>Non-partition table uses array format: "buckets": [100, 200, 
300], where array index
+ *             represents bucket id (0, 1, 2) and value represents 
log_end_offset. For buckets
+ *             without endoffset, -1 is written. Missing bucket ids in the 
sequence are also filled

Review Comment:
   Typo in comment: "endoffset" should be "end offset" (two words).



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableSnapshot.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lake;
+
+import org.apache.fluss.metadata.TableBucket;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/** The snapshot info for a table. */
+public class LakeTableSnapshot {
+
+    // the last committed snapshot id in lake
+    private final long snapshotId;
+
+    // the log offset of the bucket
+    // mapping from bucket id to log end offset or max timestamp,
+    // will be null if log offset is unknown such as reading the snapshot of 
primary key table
+    private final Map<TableBucket, Long> bucketLogEndOffset;
+
+    public LakeTableSnapshot(long snapshotId, Map<TableBucket, Long> 
bucketLogEndOffset) {
+        this.snapshotId = snapshotId;
+        this.bucketLogEndOffset = bucketLogEndOffset;
+    }
+
+    public long getSnapshotId() {
+        return snapshotId;
+    }
+
+    public Optional<Long> getLogEndOffset(TableBucket tableBucket) {
+        return Optional.ofNullable(bucketLogEndOffset.get(tableBucket));
+    }
+
+    public Map<TableBucket, Long> getBucketLogEndOffset() {
+        return bucketLogEndOffset;
+    }
+
+    @Override
+    public boolean equals(Object o) {

Review Comment:
   This 'equals()' method does not check argument type.



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lake;
+
+import org.apache.fluss.fs.FSDataOutputStream;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.utils.FlussPaths;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.fluss.metrics.registry.MetricRegistry.LOG;
+
+/** The helper to handle {@link LakeTable}. */
+public class LakeTableHelper {
+
+    private final ZooKeeperClient zkClient;
+    private final String remoteDataDir;
+
+    public LakeTableHelper(ZooKeeperClient zkClient, String remoteDataDir) {
+        this.zkClient = zkClient;
+        this.remoteDataDir = remoteDataDir;
+    }
+
+    /**
+     * Upserts a lake table snapshot for the given table.
+     *
+     * <p>This method merges the new snapshot with the existing one (if any) 
and stores it (data in
+     * remote file, the remote file path in ZK).
+     *
+     * @param tableId the table ID
+     * @param tablePath the table path
+     * @param lakeTableSnapshot the new snapshot to upsert
+     * @throws Exception if the operation fails
+     */
+    public void upsertLakeTable(
+            long tableId, TablePath tablePath, LakeTableSnapshot 
lakeTableSnapshot)
+            throws Exception {
+        Optional<LakeTable> optPreviousLakeTable = 
zkClient.getLakeTable(tableId);
+        // Merge with previous snapshot if exists
+        if (optPreviousLakeTable.isPresent()) {
+            lakeTableSnapshot =
+                    mergeLakeTable(
+                            
optPreviousLakeTable.get().getLatestTableSnapshot(), lakeTableSnapshot);
+        }
+
+        // store the lake table snapshot into a file
+        FsPath lakeTableSnapshotFsPath =
+                storeLakeTableSnapshot(tableId, tablePath, lakeTableSnapshot);
+
+        LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata =
+                new LakeTable.LakeSnapshotMetadata(
+                        lakeTableSnapshot.getSnapshotId(),
+                        // use the lake table snapshot file as the tiered 
offsets file since
+                        // the the table snapshot file will contain the tiered 
log end offsets
+                        lakeTableSnapshotFsPath,
+                        // currently, readableOffsetsFilePath is always same 
with
+                        // tieredOffsetsFilePath, but in the future we'll 
commit a readable offsets
+                        // separately to mark the what's the readable offsets 
for a snapshot since

Review Comment:
   Grammar issue: "to mark the what's the readable offsets" should be "to mark 
what the readable offsets are" or "to mark the readable offsets".



##########
fluss-common/src/main/java/org/apache/fluss/lake/committer/BucketOffset.java:
##########
@@ -57,23 +51,19 @@ public Long getPartitionId() {
         return partitionId;
     }
 
-    @Nullable
-    public String getPartitionQualifiedName() {
-        return partitionQualifiedName;
-    }
-
     @Override
     public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
         BucketOffset that = (BucketOffset) o;
-        return bucket == that.bucket
-                && logOffset == that.logOffset
-                && Objects.equals(partitionId, that.partitionId)
-                && Objects.equals(partitionQualifiedName, 
that.partitionQualifiedName);
+        return logOffset == that.logOffset
+                && bucket == that.bucket
+                && Objects.equals(partitionId, that.partitionId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(logOffset, bucket, partitionId);
     }

Review Comment:
   Missing `hashCode()` method override. The class overrides `equals()` but not 
`hashCode()`, which violates the equals-hashCode contract in Java. This can 
lead to incorrect behavior when using instances in hash-based collections.



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableSnapshotJsonSerde.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lake;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.metadata.TableBucket;
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.fluss.utils.json.JsonDeserializer;
+import org.apache.fluss.utils.json.JsonSerdeUtils;
+import org.apache.fluss.utils.json.JsonSerializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * Json serializer and deserializer for {@link LakeTableSnapshot}.
+ *
+ * <p>This serde supports two storage format versions:
+ *
+ * <ul>
+ *   <li>Version 1 (legacy): Each bucket object contains full information 
including repeated
+ *       partition names and partition_id in each bucket entry.
+ *   <li>Version 2 (current): Compact format that optimizes layout:
+ *       <ul>
+ *         <li>Non-partition table uses array format: "buckets": [100, 200, 
300], where array index
+ *             represents bucket id (0, 1, 2) and value represents 
log_end_offset. For buckets
+ *             without endoffset, -1 is written. Missing bucket ids in the 
sequence are also filled
+ *             with -1.
+ *         <li>Partition table uses object format: "buckets": {"1": [100, 
200], "2": [300, 400]},
+ *             where key is partition id, array index represents bucket id (0, 
1) and value
+ *             represents log_end_offset. For buckets without endoffset, -1 is 
written. Missing
+ *             bucket ids in the sequence are also filled with -1.
+ *       </ul>
+ *       During deserialization, values of -1 are ignored and not added to the 
bucket log end *
+ *       offset map.
+ * </ul>
+ */
+public class LakeTableSnapshotJsonSerde
+        implements JsonSerializer<LakeTableSnapshot>, 
JsonDeserializer<LakeTableSnapshot> {
+
+    public static final LakeTableSnapshotJsonSerde INSTANCE = new 
LakeTableSnapshotJsonSerde();
+
+    private static final long UNKNOW_LOG_OFFSET = -1;

Review Comment:
   Typo in constant name: `UNKNOW_LOG_OFFSET` should be `UNKNOWN_LOG_OFFSET`.



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableSnapshotJsonSerde.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lake;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.metadata.TableBucket;
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.fluss.utils.json.JsonDeserializer;
+import org.apache.fluss.utils.json.JsonSerdeUtils;
+import org.apache.fluss.utils.json.JsonSerializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * Json serializer and deserializer for {@link LakeTableSnapshot}.
+ *
+ * <p>This serde supports two storage format versions:
+ *
+ * <ul>
+ *   <li>Version 1 (legacy): Each bucket object contains full information 
including repeated
+ *       partition names and partition_id in each bucket entry.
+ *   <li>Version 2 (current): Compact format that optimizes layout:
+ *       <ul>
+ *         <li>Non-partition table uses array format: "buckets": [100, 200, 
300], where array index
+ *             represents bucket id (0, 1, 2) and value represents 
log_end_offset. For buckets
+ *             without endoffset, -1 is written. Missing bucket ids in the 
sequence are also filled
+ *             with -1.
+ *         <li>Partition table uses object format: "buckets": {"1": [100, 
200], "2": [300, 400]},
+ *             where key is partition id, array index represents bucket id (0, 
1) and value
+ *             represents log_end_offset. For buckets without endoffset, -1 is 
written. Missing
+ *             bucket ids in the sequence are also filled with -1.
+ *       </ul>
+ *       During deserialization, values of -1 are ignored and not added to the 
bucket log end *
+ *       offset map.

Review Comment:
   The comment on line 60 has a trailing asterisk ("offset map. *") which 
appears to be a formatting error or incomplete thought.



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TestingLakeTieringFactory.java:
##########
@@ -63,7 +63,10 @@ public SimpleVersionedSerializer<TestingWriteResult> 
getWriteResultSerializer()
     @Override
     public LakeCommitter<TestingWriteResult, TestingCommittable> 
createLakeCommitter(
             CommitterInitContext committerInitContext) throws IOException {
-        return testingLakeCommitter == null ? new TestingLakeCommitter() : 
testingLakeCommitter;
+        if (testingLakeCommitter == null) {
+            this.testingLakeCommitter = new TestingLakeCommitter();

Review Comment:
   The field `testingLakeCommitter` was changed from `final` to non-final to 
allow reassignment in `createLakeCommitter`. However, this creates a potential 
concurrency issue - if `createLakeCommitter` is called concurrently from 
multiple threads, multiple instances could be created and the field could be 
overwritten, leading to inconsistent behavior. Consider using synchronization 
or making this method thread-safe.
   ```suggestion
               synchronized (this) {
                   if (testingLakeCommitter == null) {
                       this.testingLakeCommitter = new TestingLakeCommitter();
                   }
               }
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTable.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lake;
+
+import org.apache.fluss.fs.FSDataInputStream;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.server.zk.data.ZkData;
+import org.apache.fluss.utils.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.fluss.metrics.registry.MetricRegistry.LOG;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * Represents lake table snapshot information stored in {@link 
ZkData.LakeTableZNode}.
+ *
+ * <p>This class supports two storage formats:
+ *
+ * <ul>
+ *   <li>Version 1 (legacy): Contains the full {@link LakeTableSnapshot} data 
directly
+ *   <li>Version 2 (current): Contains a list of lake snapshot, recording the 
metadata file path for
+ *       different lake snapshots, with actual metadata storing in file
+ * </ul>
+ *
+ * @see LakeTableJsonSerde for JSON serialization and deserialization
+ */
+public class LakeTable {
+
+    // Version 2 (current):
+    // a list of lake snapshot metadata, record the metadata for different 
lake snapshots
+    @Nullable private final List<LakeSnapshotMetadata> lakeSnapshotMetadata;
+
+    // Version 1 (legacy): the full lake table snapshot info stored in ZK, 
will be null in version2
+    @Nullable private final LakeTableSnapshot lakeTableSnapshot;
+
+    /**
+     * Creates a LakeTable from a LakeTableSnapshot (version 1 format).
+     *
+     * @param lakeTableSnapshot the snapshot data
+     */
+    public LakeTable(LakeTableSnapshot lakeTableSnapshot) {
+        this(lakeTableSnapshot, null);
+    }
+
+    /**
+     * Creates a LakeTable with a lake snapshot metadata (version 2 format).
+     *
+     * @param lakeSnapshotMetadata the metadata containing the file path to 
the snapshot data
+     */
+    public LakeTable(LakeSnapshotMetadata lakeSnapshotMetadata) {
+        this(null, Collections.singletonList(lakeSnapshotMetadata));
+    }
+
+    /**
+     * Creates a LakeTable with a list of lake snapshot metadata (version 2 
format).
+     *
+     * @param lakeSnapshotMetadata the list of lake snapshot metadata
+     */
+    public LakeTable(List<LakeSnapshotMetadata> lakeSnapshotMetadata) {
+        this(null, lakeSnapshotMetadata);
+    }
+
+    private LakeTable(
+            @Nullable LakeTableSnapshot lakeTableSnapshot,
+            List<LakeSnapshotMetadata> lakeSnapshotMetadata) {
+        this.lakeTableSnapshot = lakeTableSnapshot;
+        this.lakeSnapshotMetadata = lakeSnapshotMetadata;
+    }
+
+    @Nullable
+    public LakeSnapshotMetadata getLakeTableLatestSnapshot() {
+        if (lakeSnapshotMetadata != null && !lakeSnapshotMetadata.isEmpty()) {
+            return lakeSnapshotMetadata.get(0);
+        }
+        return null;
+    }
+
+    @Nullable
+    public List<LakeSnapshotMetadata> getLakeSnapshotMetadata() {
+        return lakeSnapshotMetadata;
+    }
+
+    /**
+     * Get the latest table snapshot for the lake table.
+     *
+     * <p>If this LakeTable was created from a LakeTableSnapshot (version 1), 
returns it directly.
+     * Otherwise, reads the snapshot data from the lake snapshot file.
+     *
+     * @return the LakeTableSnapshot
+     */
+    public LakeTableSnapshot getLatestTableSnapshot() throws Exception {
+        if (lakeTableSnapshot != null) {
+            return lakeTableSnapshot;
+        }
+        FsPath tieredOffsetsFilePath =
+                
checkNotNull(getLakeTableLatestSnapshot()).tieredOffsetsFilePath;
+        FSDataInputStream inputStream =
+                
tieredOffsetsFilePath.getFileSystem().open(tieredOffsetsFilePath);
+        try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) 
{
+            IOUtils.copyBytes(inputStream, outputStream, true);
+            return 
LakeTableSnapshotJsonSerde.fromJson(outputStream.toByteArray());
+        }
+    }
+
+    /** The lake snapshot metadata entry stored in zk lake table. */
+    public static class LakeSnapshotMetadata {
+        private final long snapshotId;
+
+        // the file path to file storing the tiered offsets,
+        // it points a file storing LakeTableSnapshot which only include 
tiered offsets
+        private final FsPath tieredOffsetsFilePath;
+
+        // the file path to file storing the readable offsets
+        private final FsPath readableOffsetsFilePath;
+
+        public LakeSnapshotMetadata(
+                long snapshotId, FsPath tieredOffsetsFilePath, FsPath 
readableOffsetsFilePath) {
+            this.snapshotId = snapshotId;
+            this.tieredOffsetsFilePath = tieredOffsetsFilePath;
+            this.readableOffsetsFilePath = readableOffsetsFilePath;
+        }
+
+        public long getSnapshotId() {
+            return snapshotId;
+        }
+
+        public FsPath getTieredOffsetsFilePath() {
+            return tieredOffsetsFilePath;
+        }
+
+        public FsPath getReadableOffsetsFilePath() {
+            return readableOffsetsFilePath;
+        }
+
+        public void discard() {
+            if (tieredOffsetsFilePath != null) {
+                delete(tieredOffsetsFilePath);
+            }
+            if (readableOffsetsFilePath != null
+                    && readableOffsetsFilePath != tieredOffsetsFilePath) {
+                delete(readableOffsetsFilePath);
+            }
+        }
+
+        private void delete(FsPath fsPath) {
+            try {
+                FileSystem fileSystem = tieredOffsetsFilePath.getFileSystem();
+                if (fileSystem.exists(tieredOffsetsFilePath)) {
+                    fileSystem.delete(tieredOffsetsFilePath, false);
+                }

Review Comment:
   The `delete` method uses `tieredOffsetsFilePath` instead of the `fsPath` 
parameter to get the file system and check/delete the file. This will cause the 
method to always delete `tieredOffsetsFilePath` regardless of which path is 
passed as the parameter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to