Copilot commented on code in PR #2037: URL: https://github.com/apache/fluss/pull/2037#discussion_r2600833598
########## fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableSnapshotJsonSerde.java: ########## @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data.lake; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.fluss.utils.json.JsonDeserializer; +import org.apache.fluss.utils.json.JsonSerdeUtils; +import org.apache.fluss.utils.json.JsonSerializer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static org.apache.fluss.utils.Preconditions.checkNotNull; +import static org.apache.fluss.utils.Preconditions.checkState; + +/** + * Json serializer and deserializer for {@link LakeTableSnapshot}. + * + * <p>This serde supports two storage format versions: + * + * <ul> + * <li>Version 1 (legacy): Each bucket object contains full information including repeated + * partition names and partition_id in each bucket entry. + * <li>Version 2 (current): Compact format that optimizes layout: + * <ul> + * <li>Non-partition table uses array format: "buckets": [100, 200, 300], where array index + * represents bucket id (0, 1, 2) and value represents log_end_offset. For buckets + * without endoffset, -1 is written. Missing bucket ids in the sequence are also filled + * with -1. + * <li>Partition table uses object format: "buckets": {"1": [100, 200], "2": [300, 400]}, + * where key is partition id, array index represents bucket id (0, 1) and value + * represents log_end_offset. For buckets without endoffset, -1 is written. Missing Review Comment: Typo in comment: "endoffset" should be "end offset" (two words), occurs twice in this line. ########## fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data.lake; + +import org.apache.fluss.fs.FSDataOutputStream; +import org.apache.fluss.fs.FileSystem; +import org.apache.fluss.fs.FsPath; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.utils.FlussPaths; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.apache.fluss.metrics.registry.MetricRegistry.LOG; + +/** The helper to handle {@link LakeTable}. */ +public class LakeTableHelper { + + private final ZooKeeperClient zkClient; + private final String remoteDataDir; + + public LakeTableHelper(ZooKeeperClient zkClient, String remoteDataDir) { + this.zkClient = zkClient; + this.remoteDataDir = remoteDataDir; + } + + /** + * Upserts a lake table snapshot for the given table. + * + * <p>This method merges the new snapshot with the existing one (if any) and stores it (data in + * remote file, the remote file path in ZK). + * + * @param tableId the table ID + * @param tablePath the table path + * @param lakeTableSnapshot the new snapshot to upsert + * @throws Exception if the operation fails + */ + public void upsertLakeTable( + long tableId, TablePath tablePath, LakeTableSnapshot lakeTableSnapshot) + throws Exception { + Optional<LakeTable> optPreviousLakeTable = zkClient.getLakeTable(tableId); + // Merge with previous snapshot if exists + if (optPreviousLakeTable.isPresent()) { + lakeTableSnapshot = + mergeLakeTable( + optPreviousLakeTable.get().getLatestTableSnapshot(), lakeTableSnapshot); + } + + // store the lake table snapshot into a file + FsPath lakeTableSnapshotFsPath = + storeLakeTableSnapshot(tableId, tablePath, lakeTableSnapshot); + + LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata = + new LakeTable.LakeSnapshotMetadata( + lakeTableSnapshot.getSnapshotId(), + // use the lake table snapshot file as the tiered offsets file since + // the the table snapshot file will contain the tiered log end offsets Review Comment: Typo in comment: "the the table" should be "the table" (duplicate word). ########## fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableHelperTest.java: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data.lake; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.fs.FileSystem; +import org.apache.fluss.fs.FsPath; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.zk.NOPErrorHandler; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.ZooKeeperExtension; +import org.apache.fluss.server.zk.ZooKeeperUtils; +import org.apache.fluss.server.zk.data.TableRegistration; +import org.apache.fluss.server.zk.data.ZkData; +import org.apache.fluss.testutils.common.AllCallbackWrapper; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; + +/** The UT for {@link LakeTableHelper}. */ +class LakeTableHelperTest { + + @RegisterExtension + public static final AllCallbackWrapper<ZooKeeperExtension> ZOO_KEEPER_EXTENSION_WRAPPER = + new AllCallbackWrapper<>(new ZooKeeperExtension()); + + private static ZooKeeperClient zookeeperClient; + + @BeforeAll + static void beforeAll() { + zookeeperClient = + ZOO_KEEPER_EXTENSION_WRAPPER + .getCustomExtension() + .getZooKeeperClient(NOPErrorHandler.INSTANCE); + } + + @AfterEach + void afterEach() { + ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupRoot(); + } + + @AfterAll + static void afterAll() { + zookeeperClient.close(); + } + + @Test + void testUpsertLakeTableCompatible(@TempDir Path tempDir) throws Exception { + // Create a ZooKeeperClient with REMOTE_DATA_DIR configuration + Configuration conf = new Configuration(); + conf.setString( + ConfigOptions.ZOOKEEPER_ADDRESS, + ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().getConnectString()); + LakeTableHelper lakeTableHelper = new LakeTableHelper(zookeeperClient, tempDir.toString()); + try (ZooKeeperClient zooKeeperClient = + ZooKeeperUtils.startZookeeperClient(conf, NOPErrorHandler.INSTANCE)) { + // first create a table + long tableId = 1; + TablePath tablePath = TablePath.of("test_db", "test_table"); + TableRegistration tableReg = + new TableRegistration( + tableId, + "test table", + Collections.emptyList(), + new TableDescriptor.TableDistribution( + 1, Collections.singletonList("a")), + Collections.emptyMap(), + Collections.emptyMap(), + System.currentTimeMillis(), + System.currentTimeMillis()); + zookeeperClient.registerTable(tablePath, tableReg); + + // Create a legacy version 1 LakeTableSnapshot (full data in ZK) + long snapshotId = 1L; + + Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>(); + bucketLogEndOffset.put(new TableBucket(tableId, 0), 100L); + bucketLogEndOffset.put(new TableBucket(tableId, 1), 200L); + + LakeTableSnapshot lakeTableSnapshot = + new LakeTableSnapshot(snapshotId, bucketLogEndOffset); + // Write version 1 format data directly to ZK (simulating old system behavior) + String zkPath = ZkData.LakeTableZNode.path(tableId); + byte[] version1Data = + LakeTableSnapshotJsonSerde.toJsonVersion1(lakeTableSnapshot, tableId); + zooKeeperClient + .getCuratorClient() + .create() + .creatingParentsIfNeeded() + .forPath(zkPath, version1Data); + + // Verify version 1 data can be read + Optional<LakeTable> optionalLakeTable = zooKeeperClient.getLakeTable(tableId); + assertThat(optionalLakeTable).isPresent(); + LakeTable lakeTable = optionalLakeTable.get(); + assertThat(lakeTable.getLatestTableSnapshot()).isEqualTo(lakeTableSnapshot); + + // Test: Call upsertLakeTableSnapshot with new snapshot data + // This should read the old version 1 data, merge it, and write as version 2 + Map<TableBucket, Long> newBucketLogEndOffset = new HashMap<>(); + newBucketLogEndOffset.put(new TableBucket(tableId, 0), 1500L); // Updated offset + newBucketLogEndOffset.put(new TableBucket(tableId, 1), 2000L); // new offset + + long snapshot2Id = 2L; + LakeTableSnapshot snapshot2 = new LakeTableSnapshot(snapshot2Id, newBucketLogEndOffset); + lakeTableHelper.upsertLakeTable(tableId, tablePath, snapshot2); + + // Verify: New version 2 data can be read + Optional<LakeTable> optLakeTableAfter = zooKeeperClient.getLakeTable(tableId); + assertThat(optLakeTableAfter).isPresent(); + LakeTable lakeTableAfter = optLakeTableAfter.get(); + assertThat(lakeTableAfter.getLakeTableLatestSnapshot()) + .isNotNull(); // Version 2 has file path + + // Verify: The lake snapshot file exists + FsPath snapshot2FileHandle = + lakeTableAfter.getLakeTableLatestSnapshot().getReadableOffsetsFilePath(); + FileSystem fileSystem = snapshot2FileHandle.getFileSystem(); + assertThat(fileSystem.exists(snapshot2FileHandle)).isTrue(); + + Optional<LakeTableSnapshot> optMergedSnapshot = + zooKeeperClient.getLakeTableSnapshot(tableId); + assertThat(optMergedSnapshot).isPresent(); + LakeTableSnapshot mergedSnapshot = optMergedSnapshot.get(); + + // verify the snapshot should merge previous snapshot + assertThat(mergedSnapshot.getSnapshotId()).isEqualTo(snapshot2Id); + Map<TableBucket, Long> expectedBucketLogEndOffset = new HashMap<>(bucketLogEndOffset); + expectedBucketLogEndOffset.putAll(newBucketLogEndOffset); + assertThat(mergedSnapshot.getBucketLogEndOffset()) + .isEqualTo(expectedBucketLogEndOffset); + + // add a new snapshot 3 again, verify snapshot + long snapshot3Id = 3L; + LakeTableSnapshot snapshot3 = new LakeTableSnapshot(snapshot3Id, newBucketLogEndOffset); + lakeTableHelper.upsertLakeTable(tableId, tablePath, snapshot3); + // verify snapshot 3 is discard Review Comment: Typo in comment: "discard" should be "discarded" (past tense) to match the assertion that follows. ########## fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data.lake; + +import org.apache.fluss.fs.FSDataOutputStream; +import org.apache.fluss.fs.FileSystem; +import org.apache.fluss.fs.FsPath; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.utils.FlussPaths; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.apache.fluss.metrics.registry.MetricRegistry.LOG; + +/** The helper to handle {@link LakeTable}. */ +public class LakeTableHelper { + + private final ZooKeeperClient zkClient; + private final String remoteDataDir; + + public LakeTableHelper(ZooKeeperClient zkClient, String remoteDataDir) { + this.zkClient = zkClient; + this.remoteDataDir = remoteDataDir; + } + + /** + * Upserts a lake table snapshot for the given table. + * + * <p>This method merges the new snapshot with the existing one (if any) and stores it (data in + * remote file, the remote file path in ZK). + * + * @param tableId the table ID + * @param tablePath the table path + * @param lakeTableSnapshot the new snapshot to upsert + * @throws Exception if the operation fails + */ + public void upsertLakeTable( + long tableId, TablePath tablePath, LakeTableSnapshot lakeTableSnapshot) + throws Exception { + Optional<LakeTable> optPreviousLakeTable = zkClient.getLakeTable(tableId); + // Merge with previous snapshot if exists + if (optPreviousLakeTable.isPresent()) { + lakeTableSnapshot = + mergeLakeTable( + optPreviousLakeTable.get().getLatestTableSnapshot(), lakeTableSnapshot); + } + + // store the lake table snapshot into a file + FsPath lakeTableSnapshotFsPath = + storeLakeTableSnapshot(tableId, tablePath, lakeTableSnapshot); + + LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata = + new LakeTable.LakeSnapshotMetadata( + lakeTableSnapshot.getSnapshotId(), + // use the lake table snapshot file as the tiered offsets file since + // the the table snapshot file will contain the tiered log end offsets + lakeTableSnapshotFsPath, + // currently, readableOffsetsFilePath is always same with + // tieredOffsetsFilePath, but in the future we'll commit a readable offsets + // separately to mark the what's the readable offsets for a snapshot since + // in paimon dv table, tiered log end offsets is not same with readable + // offsets + lakeTableSnapshotFsPath); + + // currently, we keep only one lake snapshot metadata in zk, + // todo: in solve paimon dv union read issue #2121, we'll keep multiple lake snapshot Review Comment: Grammar issue: "in solve paimon dv union read issue" should be "to solve paimon dv union read issue" or "in solving paimon dv union read issue". ########## fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableSnapshotJsonSerde.java: ########## @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data.lake; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.fluss.utils.json.JsonDeserializer; +import org.apache.fluss.utils.json.JsonSerdeUtils; +import org.apache.fluss.utils.json.JsonSerializer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static org.apache.fluss.utils.Preconditions.checkNotNull; +import static org.apache.fluss.utils.Preconditions.checkState; + +/** + * Json serializer and deserializer for {@link LakeTableSnapshot}. + * + * <p>This serde supports two storage format versions: + * + * <ul> + * <li>Version 1 (legacy): Each bucket object contains full information including repeated + * partition names and partition_id in each bucket entry. + * <li>Version 2 (current): Compact format that optimizes layout: + * <ul> + * <li>Non-partition table uses array format: "buckets": [100, 200, 300], where array index + * represents bucket id (0, 1, 2) and value represents log_end_offset. For buckets + * without endoffset, -1 is written. Missing bucket ids in the sequence are also filled Review Comment: Typo in comment: "endoffset" should be "end offset" (two words). ########## fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableSnapshot.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data.lake; + +import org.apache.fluss.metadata.TableBucket; + +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** The snapshot info for a table. */ +public class LakeTableSnapshot { + + // the last committed snapshot id in lake + private final long snapshotId; + + // the log offset of the bucket + // mapping from bucket id to log end offset or max timestamp, + // will be null if log offset is unknown such as reading the snapshot of primary key table + private final Map<TableBucket, Long> bucketLogEndOffset; + + public LakeTableSnapshot(long snapshotId, Map<TableBucket, Long> bucketLogEndOffset) { + this.snapshotId = snapshotId; + this.bucketLogEndOffset = bucketLogEndOffset; + } + + public long getSnapshotId() { + return snapshotId; + } + + public Optional<Long> getLogEndOffset(TableBucket tableBucket) { + return Optional.ofNullable(bucketLogEndOffset.get(tableBucket)); + } + + public Map<TableBucket, Long> getBucketLogEndOffset() { + return bucketLogEndOffset; + } + + @Override + public boolean equals(Object o) { Review Comment: This 'equals()' method does not check argument type. ########## fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data.lake; + +import org.apache.fluss.fs.FSDataOutputStream; +import org.apache.fluss.fs.FileSystem; +import org.apache.fluss.fs.FsPath; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.utils.FlussPaths; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.apache.fluss.metrics.registry.MetricRegistry.LOG; + +/** The helper to handle {@link LakeTable}. */ +public class LakeTableHelper { + + private final ZooKeeperClient zkClient; + private final String remoteDataDir; + + public LakeTableHelper(ZooKeeperClient zkClient, String remoteDataDir) { + this.zkClient = zkClient; + this.remoteDataDir = remoteDataDir; + } + + /** + * Upserts a lake table snapshot for the given table. + * + * <p>This method merges the new snapshot with the existing one (if any) and stores it (data in + * remote file, the remote file path in ZK). + * + * @param tableId the table ID + * @param tablePath the table path + * @param lakeTableSnapshot the new snapshot to upsert + * @throws Exception if the operation fails + */ + public void upsertLakeTable( + long tableId, TablePath tablePath, LakeTableSnapshot lakeTableSnapshot) + throws Exception { + Optional<LakeTable> optPreviousLakeTable = zkClient.getLakeTable(tableId); + // Merge with previous snapshot if exists + if (optPreviousLakeTable.isPresent()) { + lakeTableSnapshot = + mergeLakeTable( + optPreviousLakeTable.get().getLatestTableSnapshot(), lakeTableSnapshot); + } + + // store the lake table snapshot into a file + FsPath lakeTableSnapshotFsPath = + storeLakeTableSnapshot(tableId, tablePath, lakeTableSnapshot); + + LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata = + new LakeTable.LakeSnapshotMetadata( + lakeTableSnapshot.getSnapshotId(), + // use the lake table snapshot file as the tiered offsets file since + // the the table snapshot file will contain the tiered log end offsets + lakeTableSnapshotFsPath, + // currently, readableOffsetsFilePath is always same with + // tieredOffsetsFilePath, but in the future we'll commit a readable offsets + // separately to mark the what's the readable offsets for a snapshot since Review Comment: Grammar issue: "to mark the what's the readable offsets" should be "to mark what the readable offsets are" or "to mark the readable offsets". ########## fluss-common/src/main/java/org/apache/fluss/lake/committer/BucketOffset.java: ########## @@ -57,23 +51,19 @@ public Long getPartitionId() { return partitionId; } - @Nullable - public String getPartitionQualifiedName() { - return partitionQualifiedName; - } - @Override public boolean equals(Object o) { - if (this == o) { - return true; - } if (o == null || getClass() != o.getClass()) { return false; } BucketOffset that = (BucketOffset) o; - return bucket == that.bucket - && logOffset == that.logOffset - && Objects.equals(partitionId, that.partitionId) - && Objects.equals(partitionQualifiedName, that.partitionQualifiedName); + return logOffset == that.logOffset + && bucket == that.bucket + && Objects.equals(partitionId, that.partitionId); + } + + @Override + public int hashCode() { + return Objects.hash(logOffset, bucket, partitionId); } Review Comment: Missing `hashCode()` method override. The class overrides `equals()` but not `hashCode()`, which violates the equals-hashCode contract in Java. This can lead to incorrect behavior when using instances in hash-based collections. ########## fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableSnapshotJsonSerde.java: ########## @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data.lake; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.fluss.utils.json.JsonDeserializer; +import org.apache.fluss.utils.json.JsonSerdeUtils; +import org.apache.fluss.utils.json.JsonSerializer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static org.apache.fluss.utils.Preconditions.checkNotNull; +import static org.apache.fluss.utils.Preconditions.checkState; + +/** + * Json serializer and deserializer for {@link LakeTableSnapshot}. + * + * <p>This serde supports two storage format versions: + * + * <ul> + * <li>Version 1 (legacy): Each bucket object contains full information including repeated + * partition names and partition_id in each bucket entry. + * <li>Version 2 (current): Compact format that optimizes layout: + * <ul> + * <li>Non-partition table uses array format: "buckets": [100, 200, 300], where array index + * represents bucket id (0, 1, 2) and value represents log_end_offset. For buckets + * without endoffset, -1 is written. Missing bucket ids in the sequence are also filled + * with -1. + * <li>Partition table uses object format: "buckets": {"1": [100, 200], "2": [300, 400]}, + * where key is partition id, array index represents bucket id (0, 1) and value + * represents log_end_offset. For buckets without endoffset, -1 is written. Missing + * bucket ids in the sequence are also filled with -1. + * </ul> + * During deserialization, values of -1 are ignored and not added to the bucket log end * + * offset map. + * </ul> + */ +public class LakeTableSnapshotJsonSerde + implements JsonSerializer<LakeTableSnapshot>, JsonDeserializer<LakeTableSnapshot> { + + public static final LakeTableSnapshotJsonSerde INSTANCE = new LakeTableSnapshotJsonSerde(); + + private static final long UNKNOW_LOG_OFFSET = -1; Review Comment: Typo in constant name: `UNKNOW_LOG_OFFSET` should be `UNKNOWN_LOG_OFFSET`. ########## fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableSnapshotJsonSerde.java: ########## @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data.lake; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.fluss.utils.json.JsonDeserializer; +import org.apache.fluss.utils.json.JsonSerdeUtils; +import org.apache.fluss.utils.json.JsonSerializer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static org.apache.fluss.utils.Preconditions.checkNotNull; +import static org.apache.fluss.utils.Preconditions.checkState; + +/** + * Json serializer and deserializer for {@link LakeTableSnapshot}. + * + * <p>This serde supports two storage format versions: + * + * <ul> + * <li>Version 1 (legacy): Each bucket object contains full information including repeated + * partition names and partition_id in each bucket entry. + * <li>Version 2 (current): Compact format that optimizes layout: + * <ul> + * <li>Non-partition table uses array format: "buckets": [100, 200, 300], where array index + * represents bucket id (0, 1, 2) and value represents log_end_offset. For buckets + * without endoffset, -1 is written. Missing bucket ids in the sequence are also filled + * with -1. + * <li>Partition table uses object format: "buckets": {"1": [100, 200], "2": [300, 400]}, + * where key is partition id, array index represents bucket id (0, 1) and value + * represents log_end_offset. For buckets without endoffset, -1 is written. Missing + * bucket ids in the sequence are also filled with -1. + * </ul> + * During deserialization, values of -1 are ignored and not added to the bucket log end * + * offset map. Review Comment: The comment on line 60 has a trailing asterisk ("offset map. *") which appears to be a formatting error or incomplete thought. ########## fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TestingLakeTieringFactory.java: ########## @@ -63,7 +63,10 @@ public SimpleVersionedSerializer<TestingWriteResult> getWriteResultSerializer() @Override public LakeCommitter<TestingWriteResult, TestingCommittable> createLakeCommitter( CommitterInitContext committerInitContext) throws IOException { - return testingLakeCommitter == null ? new TestingLakeCommitter() : testingLakeCommitter; + if (testingLakeCommitter == null) { + this.testingLakeCommitter = new TestingLakeCommitter(); Review Comment: The field `testingLakeCommitter` was changed from `final` to non-final to allow reassignment in `createLakeCommitter`. However, this creates a potential concurrency issue - if `createLakeCommitter` is called concurrently from multiple threads, multiple instances could be created and the field could be overwritten, leading to inconsistent behavior. Consider using synchronization or making this method thread-safe. ```suggestion synchronized (this) { if (testingLakeCommitter == null) { this.testingLakeCommitter = new TestingLakeCommitter(); } } ``` ########## fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTable.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data.lake; + +import org.apache.fluss.fs.FSDataInputStream; +import org.apache.fluss.fs.FileSystem; +import org.apache.fluss.fs.FsPath; +import org.apache.fluss.server.zk.data.ZkData; +import org.apache.fluss.utils.IOUtils; + +import javax.annotation.Nullable; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.apache.fluss.metrics.registry.MetricRegistry.LOG; +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** + * Represents lake table snapshot information stored in {@link ZkData.LakeTableZNode}. + * + * <p>This class supports two storage formats: + * + * <ul> + * <li>Version 1 (legacy): Contains the full {@link LakeTableSnapshot} data directly + * <li>Version 2 (current): Contains a list of lake snapshot, recording the metadata file path for + * different lake snapshots, with actual metadata storing in file + * </ul> + * + * @see LakeTableJsonSerde for JSON serialization and deserialization + */ +public class LakeTable { + + // Version 2 (current): + // a list of lake snapshot metadata, record the metadata for different lake snapshots + @Nullable private final List<LakeSnapshotMetadata> lakeSnapshotMetadata; + + // Version 1 (legacy): the full lake table snapshot info stored in ZK, will be null in version2 + @Nullable private final LakeTableSnapshot lakeTableSnapshot; + + /** + * Creates a LakeTable from a LakeTableSnapshot (version 1 format). + * + * @param lakeTableSnapshot the snapshot data + */ + public LakeTable(LakeTableSnapshot lakeTableSnapshot) { + this(lakeTableSnapshot, null); + } + + /** + * Creates a LakeTable with a lake snapshot metadata (version 2 format). + * + * @param lakeSnapshotMetadata the metadata containing the file path to the snapshot data + */ + public LakeTable(LakeSnapshotMetadata lakeSnapshotMetadata) { + this(null, Collections.singletonList(lakeSnapshotMetadata)); + } + + /** + * Creates a LakeTable with a list of lake snapshot metadata (version 2 format). + * + * @param lakeSnapshotMetadata the list of lake snapshot metadata + */ + public LakeTable(List<LakeSnapshotMetadata> lakeSnapshotMetadata) { + this(null, lakeSnapshotMetadata); + } + + private LakeTable( + @Nullable LakeTableSnapshot lakeTableSnapshot, + List<LakeSnapshotMetadata> lakeSnapshotMetadata) { + this.lakeTableSnapshot = lakeTableSnapshot; + this.lakeSnapshotMetadata = lakeSnapshotMetadata; + } + + @Nullable + public LakeSnapshotMetadata getLakeTableLatestSnapshot() { + if (lakeSnapshotMetadata != null && !lakeSnapshotMetadata.isEmpty()) { + return lakeSnapshotMetadata.get(0); + } + return null; + } + + @Nullable + public List<LakeSnapshotMetadata> getLakeSnapshotMetadata() { + return lakeSnapshotMetadata; + } + + /** + * Get the latest table snapshot for the lake table. + * + * <p>If this LakeTable was created from a LakeTableSnapshot (version 1), returns it directly. + * Otherwise, reads the snapshot data from the lake snapshot file. + * + * @return the LakeTableSnapshot + */ + public LakeTableSnapshot getLatestTableSnapshot() throws Exception { + if (lakeTableSnapshot != null) { + return lakeTableSnapshot; + } + FsPath tieredOffsetsFilePath = + checkNotNull(getLakeTableLatestSnapshot()).tieredOffsetsFilePath; + FSDataInputStream inputStream = + tieredOffsetsFilePath.getFileSystem().open(tieredOffsetsFilePath); + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + IOUtils.copyBytes(inputStream, outputStream, true); + return LakeTableSnapshotJsonSerde.fromJson(outputStream.toByteArray()); + } + } + + /** The lake snapshot metadata entry stored in zk lake table. */ + public static class LakeSnapshotMetadata { + private final long snapshotId; + + // the file path to file storing the tiered offsets, + // it points a file storing LakeTableSnapshot which only include tiered offsets + private final FsPath tieredOffsetsFilePath; + + // the file path to file storing the readable offsets + private final FsPath readableOffsetsFilePath; + + public LakeSnapshotMetadata( + long snapshotId, FsPath tieredOffsetsFilePath, FsPath readableOffsetsFilePath) { + this.snapshotId = snapshotId; + this.tieredOffsetsFilePath = tieredOffsetsFilePath; + this.readableOffsetsFilePath = readableOffsetsFilePath; + } + + public long getSnapshotId() { + return snapshotId; + } + + public FsPath getTieredOffsetsFilePath() { + return tieredOffsetsFilePath; + } + + public FsPath getReadableOffsetsFilePath() { + return readableOffsetsFilePath; + } + + public void discard() { + if (tieredOffsetsFilePath != null) { + delete(tieredOffsetsFilePath); + } + if (readableOffsetsFilePath != null + && readableOffsetsFilePath != tieredOffsetsFilePath) { + delete(readableOffsetsFilePath); + } + } + + private void delete(FsPath fsPath) { + try { + FileSystem fileSystem = tieredOffsetsFilePath.getFileSystem(); + if (fileSystem.exists(tieredOffsetsFilePath)) { + fileSystem.delete(tieredOffsetsFilePath, false); + } Review Comment: The `delete` method uses `tieredOffsetsFilePath` instead of the `fsPath` parameter to get the file system and check/delete the file. This will cause the method to always delete `tieredOffsetsFilePath` regardless of which path is passed as the parameter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
