Copilot commented on code in PR #2037: URL: https://github.com/apache/fluss/pull/2037#discussion_r2587742529
########## fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTable.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data.lake; + +import org.apache.fluss.fs.FSDataInputStream; +import org.apache.fluss.fs.FsPath; +import org.apache.fluss.server.zk.data.ZkData; +import org.apache.fluss.utils.IOUtils; + +import javax.annotation.Nullable; + +import java.io.ByteArrayOutputStream; + +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** + * Represents lake table snapshot information stored in {@link ZkData.LakeTableZNode}. + * + * <p>This class supports two storage formats: + * + * <ul> + * <li>Version 1 (legacy): Contains the full {@link LakeTableSnapshot} data directly + * <li>Version 2 (current): Contains only the file paths points to the file storing {@link + * LakeTableSnapshot}, with actual data in remote file. It'll contain multiple file paths + * points to track different lake table snapshot. Review Comment: The comment states "It'll contain multiple file paths points to track different lake table snapshot" but currently the implementation only contains one file path (`lakeTableLatestSnapshotFileHandle`). The `lakeTableLatestCompactedSnapshotFileHandle` is declared but unused. The comment should either be updated to reflect the current single-file-path implementation, or if multiple paths are planned for future use, this should be clarified with a TODO comment. ```suggestion * LakeTableSnapshot}, with actual data in remote file. Currently, only a single file path * is supported to track the latest lake table snapshot. * <!-- TODO: Support multiple file paths for tracking different lake table snapshots in future versions. --> ``` ########## fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTable.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data.lake; + +import org.apache.fluss.fs.FSDataInputStream; +import org.apache.fluss.fs.FsPath; +import org.apache.fluss.server.zk.data.ZkData; +import org.apache.fluss.utils.IOUtils; + +import javax.annotation.Nullable; + +import java.io.ByteArrayOutputStream; + +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** + * Represents lake table snapshot information stored in {@link ZkData.LakeTableZNode}. + * + * <p>This class supports two storage formats: + * + * <ul> + * <li>Version 1 (legacy): Contains the full {@link LakeTableSnapshot} data directly + * <li>Version 2 (current): Contains only the file paths points to the file storing {@link + * LakeTableSnapshot}, with actual data in remote file. It'll contain multiple file paths + * points to track different lake table snapshot. + * </ul> + * + * @see LakeTableJsonSerde for JSON serialization and deserialization + */ +public class LakeTable { + + // Version 2 (current): + // the pointer to the file stores the latest known LakeTableSnapshot, will be null in + // version1 + @Nullable private final FsPath lakeTableLatestSnapshotFileHandle; + + // the pointer to the file stores the latest known compacted LakeTableSnapshot, will be null in + // version1 or no any known compacted LakeTableSnapshot + @Nullable private final FsPath lakeTableLatestCompactedSnapshotFileHandle = null; + Review Comment: The field `lakeTableLatestCompactedSnapshotFileHandle` is declared but never used. It's initialized to `null` and has no getter or setter. Consider removing this unused field or implementing the functionality if it's planned for future use. If it's a placeholder for future functionality, add a TODO comment explaining the intention. ```suggestion ``` ########## fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableSnapshotJsonSerde.java: ########## @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data.lake; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.fluss.utils.json.JsonDeserializer; +import org.apache.fluss.utils.json.JsonSerdeUtils; +import org.apache.fluss.utils.json.JsonSerializer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Json serializer and deserializer for {@link LakeTableSnapshot}. + * + * <p>This serde supports two storage format versions: + * + * <ul> + * <li>Version 1 (legacy): Each bucket object contains full information including repeated + * partition names and partition_id in each bucket entry. + * <li>Version 2 (current): Compact format that optimizes layout to avoid duplication: + * <ul> + * <li>Extracts partition names to a top-level "partition_names" map + * <li>Groups buckets by partition_id in "buckets" to avoid repeating partition_id in each + * bucket + * <li>Non-partition table uses array format: "buckets": [...] + * <li>Partition table uses object format: "buckets": {"1": [...], "2": [...]}, "1", "2" is + * for the partition id + * </ul> + * Field names remain the same as Version 1, only the layout is optimized. + * </ul> + */ +public class LakeTableSnapshotJsonSerde + implements JsonSerializer<LakeTableSnapshot>, JsonDeserializer<LakeTableSnapshot> { + + public static final LakeTableSnapshotJsonSerde INSTANCE = new LakeTableSnapshotJsonSerde(); + + private static final String VERSION_KEY = "version"; + + private static final String SNAPSHOT_ID = "snapshot_id"; + private static final String TABLE_ID = "table_id"; + private static final String PARTITION_ID = "partition_id"; + private static final String BUCKETS = "buckets"; + private static final String BUCKET_ID = "bucket_id"; + private static final String LOG_START_OFFSET = "log_start_offset"; + private static final String LOG_END_OFFSET = "log_end_offset"; + private static final String MAX_TIMESTAMP = "max_timestamp"; + private static final String PARTITION_NAME = "partition_name"; + + // introduced in v2 + private static final String PARTITION_NAMES = "partition_names"; + private static final int VERSION_1 = 1; + private static final int VERSION_2 = 2; + private static final int CURRENT_VERSION = VERSION_2; + + @Override + public void serialize(LakeTableSnapshot lakeTableSnapshot, JsonGenerator generator) + throws IOException { + generator.writeStartObject(); + generator.writeNumberField(VERSION_KEY, CURRENT_VERSION); + generator.writeNumberField(SNAPSHOT_ID, lakeTableSnapshot.getSnapshotId()); + generator.writeNumberField(TABLE_ID, lakeTableSnapshot.getTableId()); + + // Extract partition names to top-level map to avoid duplication + Map<Long, String> partitionNameIdByPartitionId = + lakeTableSnapshot.getPartitionNameIdByPartitionId(); + if (!partitionNameIdByPartitionId.isEmpty()) { + generator.writeObjectFieldStart(PARTITION_NAMES); + for (Map.Entry<Long, String> entry : partitionNameIdByPartitionId.entrySet()) { + generator.writeStringField(String.valueOf(entry.getKey()), entry.getValue()); + } + generator.writeEndObject(); + } + + // Group buckets by partition_id to avoid repeating partition_id in each bucket + Map<Long, List<TableBucket>> partitionBuckets = new HashMap<>(); + List<TableBucket> nonPartitionBuckets = new ArrayList<>(); + + for (TableBucket tableBucket : lakeTableSnapshot.getBucketLogEndOffset().keySet()) { + if (tableBucket.getPartitionId() != null) { + partitionBuckets + .computeIfAbsent(tableBucket.getPartitionId(), k -> new ArrayList<>()) + .add(tableBucket); + } else { + nonPartitionBuckets.add(tableBucket); + } + } + + // Serialize buckets: use array for non-partition buckets, object for partition buckets + if (!nonPartitionBuckets.isEmpty() || !partitionBuckets.isEmpty()) { + if (!partitionBuckets.isEmpty()) { + generator.writeObjectFieldStart(BUCKETS); + for (Map.Entry<Long, java.util.List<TableBucket>> entry : + partitionBuckets.entrySet()) { + // Partition table: grouped by partition_id, first write partition + generator.writeArrayFieldStart(String.valueOf(entry.getKey())); + for (TableBucket tableBucket : entry.getValue()) { + // write bucket + writeBucketObject(generator, lakeTableSnapshot, tableBucket); + } + generator.writeEndArray(); + } + generator.writeEndObject(); + } else { + // Non-partition table: use array format directly + generator.writeArrayFieldStart(BUCKETS); + for (TableBucket tableBucket : nonPartitionBuckets) { + writeBucketObject(generator, lakeTableSnapshot, tableBucket); + } + generator.writeEndArray(); + } + } + + generator.writeEndObject(); + } + + /** Helper method to write a bucket object. */ + private void writeBucketObject( + JsonGenerator generator, LakeTableSnapshot lakeTableSnapshot, TableBucket tableBucket) + throws IOException { + generator.writeStartObject(); + + generator.writeNumberField(BUCKET_ID, tableBucket.getBucket()); + + // Only include non-null values + if (lakeTableSnapshot.getLogStartOffset(tableBucket).isPresent()) { + generator.writeNumberField( + LOG_START_OFFSET, lakeTableSnapshot.getLogStartOffset(tableBucket).get()); + } + + if (lakeTableSnapshot.getLogEndOffset(tableBucket).isPresent()) { + generator.writeNumberField( + LOG_END_OFFSET, lakeTableSnapshot.getLogEndOffset(tableBucket).get()); + } + + if (lakeTableSnapshot.getMaxTimestamp(tableBucket).isPresent()) { + generator.writeNumberField( + MAX_TIMESTAMP, lakeTableSnapshot.getMaxTimestamp(tableBucket).get()); + } + + generator.writeEndObject(); + } + + @Override + public LakeTableSnapshot deserialize(JsonNode node) { + int version = node.get(VERSION_KEY).asInt(); + if (version == VERSION_1) { + return deserializeVersion1(node); + } else if (version == VERSION_2) { + return deserializeVersion2(node); + } else { + throw new IllegalArgumentException("Unsupported version: " + version); + } + } + + /** Deserialize Version 1 format (legacy). */ + private LakeTableSnapshot deserializeVersion1(JsonNode node) { + long snapshotId = node.get(SNAPSHOT_ID).asLong(); + long tableId = node.get(TABLE_ID).asLong(); + Iterator<JsonNode> buckets = node.get(BUCKETS).elements(); + Map<TableBucket, Long> bucketLogStartOffset = new HashMap<>(); + Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>(); + Map<TableBucket, Long> bucketMaxTimestamp = new HashMap<>(); + Map<Long, String> partitionNameIdByPartitionId = new HashMap<>(); + while (buckets.hasNext()) { + JsonNode bucket = buckets.next(); + TableBucket tableBucket; + Long partitionId = + bucket.get(PARTITION_ID) != null ? bucket.get(PARTITION_ID).asLong() : null; + tableBucket = new TableBucket(tableId, partitionId, bucket.get(BUCKET_ID).asInt()); + + if (bucket.get(LOG_START_OFFSET) != null) { + bucketLogStartOffset.put(tableBucket, bucket.get(LOG_START_OFFSET).asLong()); + } else { + bucketLogStartOffset.put(tableBucket, null); + } + + if (bucket.get(LOG_END_OFFSET) != null) { + bucketLogEndOffset.put(tableBucket, bucket.get(LOG_END_OFFSET).asLong()); + } else { + bucketLogEndOffset.put(tableBucket, null); + } + + if (bucket.get(MAX_TIMESTAMP) != null) { + bucketMaxTimestamp.put(tableBucket, bucket.get(MAX_TIMESTAMP).asLong()); + } else { + bucketMaxTimestamp.put(tableBucket, null); + } + + if (partitionId != null && bucket.get(PARTITION_NAME) != null) { + partitionNameIdByPartitionId.put( + tableBucket.getPartitionId(), bucket.get(PARTITION_NAME).asText()); + } + } + return new LakeTableSnapshot( + snapshotId, + tableId, + bucketLogStartOffset, + bucketLogEndOffset, + bucketMaxTimestamp, + partitionNameIdByPartitionId); + } + + /** Deserialize Version 2 format (compact layout). */ + private LakeTableSnapshot deserializeVersion2(JsonNode node) { + long snapshotId = node.get(SNAPSHOT_ID).asLong(); + long tableId = node.get(TABLE_ID).asLong(); + + // Load partition names from top-level map + Map<Long, String> partitionNameIdByPartitionId = new HashMap<>(); + if (node.has(PARTITION_NAMES) && node.get(PARTITION_NAMES) != null) { + JsonNode partitionsNode = node.get(PARTITION_NAMES); + Iterator<Map.Entry<String, JsonNode>> partitions = partitionsNode.fields(); + while (partitions.hasNext()) { + Map.Entry<String, JsonNode> entry = partitions.next(); + Long partitionId = Long.parseLong(entry.getKey()); + String partitionName = entry.getValue().asText(); + partitionNameIdByPartitionId.put(partitionId, partitionName); + } + } + + Map<TableBucket, Long> bucketLogStartOffset = new HashMap<>(); + Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>(); + Map<TableBucket, Long> bucketMaxTimestamp = new HashMap<>(); + + // Deserialize buckets: array format for non-partition table, object format for partition + // table + if (node.has(BUCKETS) && node.get(BUCKETS) != null) { + JsonNode bucketsNode = node.get(BUCKETS); + if (bucketsNode.isArray()) { + // Non-partition table: array format + Iterator<JsonNode> buckets = bucketsNode.elements(); + while (buckets.hasNext()) { + JsonNode bucket = buckets.next(); + TableBucket tableBucket = + new TableBucket(tableId, bucket.get(BUCKET_ID).asInt()); + readBucketFields( + bucket, + tableBucket, + bucketLogStartOffset, + bucketLogEndOffset, + bucketMaxTimestamp); + } + } else { + // Partition table: object format grouped by partition_id + Iterator<Map.Entry<String, JsonNode>> partitions = bucketsNode.fields(); + while (partitions.hasNext()) { + Map.Entry<String, JsonNode> entry = partitions.next(); + String partitionKey = entry.getKey(); + Long actualPartitionId = Long.parseLong(partitionKey); Review Comment: Potential uncaught 'java.lang.NumberFormatException'. ```suggestion Long actualPartitionId = null; try { actualPartitionId = Long.parseLong(partitionKey); } catch (NumberFormatException e) { System.err.println("Warning: Invalid partition key '" + partitionKey + "' encountered during deserialization. Skipping this partition."); continue; } ``` ########## fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data.lake; + +import org.apache.fluss.fs.FSDataOutputStream; +import org.apache.fluss.fs.FileSystem; +import org.apache.fluss.fs.FsPath; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.utils.FlussPaths; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.apache.fluss.metrics.registry.MetricRegistry.LOG; + +/** The helper to handle {@link LakeTable}. */ +public class LakeTableHelper { + + private final ZooKeeperClient zkClient; + private final String remoteDataDir; + + public LakeTableHelper(ZooKeeperClient zkClient, String remoteDataDir) { + this.zkClient = zkClient; + this.remoteDataDir = remoteDataDir; + } + + /** + * Upserts a lake table snapshot for the given table. + * + * <p>This method merges the new snapshot with the existing one (if any) and stores it(data in + * remote file, the remote file path in ZK). + * + * @param tableId the table ID + * @param tablePath the table path + * @param lakeTableSnapshot the new snapshot to upsert + * @throws Exception if the operation fails + */ + public void upsertLakeTable( + long tableId, TablePath tablePath, LakeTableSnapshot lakeTableSnapshot) + throws Exception { + Optional<LakeTable> optPreviousLakeTable = zkClient.getLakeTable(tableId); + // Merge with previous snapshot if exists + if (optPreviousLakeTable.isPresent()) { + lakeTableSnapshot = + mergeLakeTable( + optPreviousLakeTable.get().toLakeTableSnapshot(), lakeTableSnapshot); + } + + FsPath lakeTableSnapshotFsPath = + storeLakeTableSnapshot(tableId, tablePath, lakeTableSnapshot); + LakeTable lakeTable = new LakeTable(lakeTableSnapshotFsPath); + try { + zkClient.upsertLakeTable(tableId, lakeTable, optPreviousLakeTable.isPresent()); + } catch (Exception e) { + LOG.warn("Failed to upsert lake table snapshot to zk.", e); + // delete the new lake table snapshot file + deleteFile(lakeTableSnapshotFsPath); + throw e; + } + + if (optPreviousLakeTable.isPresent()) { + FsPath previousLakeSnapshotFsPath = + optPreviousLakeTable.get().getLakeTableLatestSnapshotFileHandle(); + if (previousLakeSnapshotFsPath != null) { + deleteFile(previousLakeSnapshotFsPath); + } + } + } + + private LakeTableSnapshot mergeLakeTable( + LakeTableSnapshot previousLakeTableSnapshot, + LakeTableSnapshot newLakeTableTableSnapshot) { Review Comment: The parameter name `newLakeTableTableSnapshot` contains a typo with duplicated "Table". It should be `newLakeTableSnapshot` for consistency and clarity. ########## fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data.lake; + +import org.apache.fluss.fs.FSDataOutputStream; +import org.apache.fluss.fs.FileSystem; +import org.apache.fluss.fs.FsPath; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.utils.FlussPaths; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.apache.fluss.metrics.registry.MetricRegistry.LOG; + +/** The helper to handle {@link LakeTable}. */ +public class LakeTableHelper { + + private final ZooKeeperClient zkClient; + private final String remoteDataDir; + + public LakeTableHelper(ZooKeeperClient zkClient, String remoteDataDir) { + this.zkClient = zkClient; + this.remoteDataDir = remoteDataDir; + } + + /** + * Upserts a lake table snapshot for the given table. + * + * <p>This method merges the new snapshot with the existing one (if any) and stores it(data in Review Comment: Minor grammar issue in the JavaDoc. Change "stores it(data in" to "stores it (data in" - add a space before the opening parenthesis for proper formatting. ```suggestion * <p>This method merges the new snapshot with the existing one (if any) and stores it (data in ``` ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java: ########## @@ -407,6 +399,11 @@ CompletableFuture<Void> stopServices() { exception = ExceptionUtils.firstOrSuppressed(t, exception); } + if (ioExecutor != null) { + // shutdown io executor + ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, ioExecutor); Review Comment: The `ioExecutor` shutdown should be wrapped in a try-catch block to handle potential exceptions, similar to other resource cleanup operations in this method. If an exception is thrown during executor shutdown, it could prevent subsequent cleanup operations from executing. ```java try { if (ioExecutor != null) { // shutdown io executor ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, ioExecutor); } } catch (Throwable t) { exception = ExceptionUtils.firstOrSuppressed(t, exception); } ``` ```suggestion try { if (ioExecutor != null) { // shutdown io executor ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, ioExecutor); } } catch (Throwable t) { exception = ExceptionUtils.firstOrSuppressed(t, exception); ``` ########## fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableHelperTest.java: ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data.lake; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.fs.FileSystem; +import org.apache.fluss.fs.FsPath; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.zk.NOPErrorHandler; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.ZooKeeperExtension; +import org.apache.fluss.server.zk.ZooKeeperUtils; +import org.apache.fluss.server.zk.data.TableRegistration; +import org.apache.fluss.server.zk.data.ZkData; +import org.apache.fluss.testutils.common.AllCallbackWrapper; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; + +/** The UT for {@link LakeTableHelper}. */ +class LakeTableHelperTest { + + @RegisterExtension + public static final AllCallbackWrapper<ZooKeeperExtension> ZOO_KEEPER_EXTENSION_WRAPPER = + new AllCallbackWrapper<>(new ZooKeeperExtension()); + + private static ZooKeeperClient zookeeperClient; + + @BeforeAll + static void beforeAll() { + zookeeperClient = + ZOO_KEEPER_EXTENSION_WRAPPER + .getCustomExtension() + .getZooKeeperClient(NOPErrorHandler.INSTANCE); + } + + @AfterEach + void afterEach() { + ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupRoot(); + } + + @AfterAll + static void afterAll() { + zookeeperClient.close(); + } + + @Test + void testUpsertLakeTableCompatible(@TempDir Path tempDir) throws Exception { + // Create a ZooKeeperClient with REMOTE_DATA_DIR configuration + Configuration conf = new Configuration(); + conf.setString( + ConfigOptions.ZOOKEEPER_ADDRESS, + ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().getConnectString()); + LakeTableHelper lakeTableHelper = new LakeTableHelper(zookeeperClient, tempDir.toString()); + try (ZooKeeperClient zooKeeperClient = + ZooKeeperUtils.startZookeeperClient(conf, NOPErrorHandler.INSTANCE)) { + // first create a table + long tableId = 1; + TablePath tablePath = TablePath.of("test_db", "test_table"); + TableRegistration tableReg = + new TableRegistration( + tableId, + "test table", + Collections.emptyList(), + new TableDescriptor.TableDistribution( + 1, Collections.singletonList("a")), + Collections.emptyMap(), + Collections.emptyMap(), + System.currentTimeMillis(), + System.currentTimeMillis()); + zookeeperClient.registerTable(tablePath, tableReg); + + // Create a legacy version 1 LakeTableSnapshot (full data in ZK) + long snapshotId = 1L; + Map<TableBucket, Long> bucketLogStartOffset = new HashMap<>(); + bucketLogStartOffset.put(new TableBucket(tableId, 0), 10L); + bucketLogStartOffset.put(new TableBucket(tableId, 1), 20L); + + Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>(); + bucketLogEndOffset.put(new TableBucket(tableId, 0), 100L); + bucketLogEndOffset.put(new TableBucket(tableId, 1), 200L); + + Map<TableBucket, Long> bucketMaxTimestamp = new HashMap<>(); + bucketMaxTimestamp.put(new TableBucket(tableId, 0), 1000L); + bucketMaxTimestamp.put(new TableBucket(tableId, 1), 2000L); + + Map<Long, String> partitionNameIdByPartitionId = new HashMap<>(); + LakeTableSnapshot lakeTableSnapshot = + new LakeTableSnapshot( + snapshotId, + tableId, + bucketLogStartOffset, + bucketLogEndOffset, + bucketMaxTimestamp, + partitionNameIdByPartitionId); + // Write version 1 format data directly to ZK (simulating old system behavior) + String zkPath = ZkData.LakeTableZNode.path(tableId); + byte[] version1Data = LakeTableSnapshotJsonSerde.toJsonVersion1(lakeTableSnapshot); + zooKeeperClient + .getCuratorClient() + .create() + .creatingParentsIfNeeded() + .forPath(zkPath, version1Data); + + // Verify version 1 data can be read + Optional<LakeTable> optionalLakeTable = zooKeeperClient.getLakeTable(tableId); + assertThat(optionalLakeTable).isPresent(); + LakeTable lakeTable = optionalLakeTable.get(); + assertThat(lakeTable.toLakeTableSnapshot()).isEqualTo(lakeTableSnapshot); + + // Test: Call upsertLakeTableSnapshot with new snapshot data + // This should read the old version 1 data, merge it, and write as version 2 + Map<TableBucket, Long> newBucketLogEndOffset = new HashMap<>(); + newBucketLogEndOffset.put(new TableBucket(tableId, 0), 1500L); // Updated offset + newBucketLogEndOffset.put(new TableBucket(tableId, 1), 2000L); // new offset + + long snapshot2Id = 2L; + LakeTableSnapshot snapshot2 = + new LakeTableSnapshot( + snapshot2Id, + tableId, + Collections.emptyMap(), + newBucketLogEndOffset, + Collections.emptyMap(), + Collections.emptyMap()); + lakeTableHelper.upsertLakeTable(tableId, tablePath, snapshot2); + + // Verify: New version 2 data can be read + Optional<LakeTable> optLakeTableAfter = zooKeeperClient.getLakeTable(tableId); + assertThat(optLakeTableAfter).isPresent(); + LakeTable lakeTableAfter = optLakeTableAfter.get(); + assertThat(lakeTableAfter.getLakeTableLatestSnapshotFileHandle()) + .isNotNull(); // Version 2 has file path + + // Verify: The lake snapshot file exists + FsPath snapshot2FileHandle = lakeTableAfter.getLakeTableLatestSnapshotFileHandle(); + FileSystem fileSystem = snapshot2FileHandle.getFileSystem(); + assertThat(fileSystem.exists(snapshot2FileHandle)).isTrue(); + + Optional<LakeTableSnapshot> optMergedSnapshot = + zooKeeperClient.getLakeTableSnapshot(tableId); + assertThat(optMergedSnapshot).isPresent(); + LakeTableSnapshot mergedSnapshot = optMergedSnapshot.get(); + + // verify the snapshot should merge previous snapshot + assertThat(mergedSnapshot.getSnapshotId()).isEqualTo(snapshot2Id); + assertThat(mergedSnapshot.getTableId()).isEqualTo(tableId); + assertThat(mergedSnapshot.getBucketLogStartOffset()).isEqualTo(bucketLogStartOffset); + + Map<TableBucket, Long> expectedBucketLogEndOffset = new HashMap<>(bucketLogEndOffset); + expectedBucketLogEndOffset.putAll(newBucketLogEndOffset); + assertThat(mergedSnapshot.getBucketLogEndOffset()) + .isEqualTo(expectedBucketLogEndOffset); + assertThat(mergedSnapshot.getBucketMaxTimestamp()).isEqualTo(bucketMaxTimestamp); + assertThat(mergedSnapshot.getPartitionNameIdByPartitionId()) + .isEqualTo(partitionNameIdByPartitionId); + + // add a new snapshot 3 again, verify snapshot + long snapshot3Id = 2L; Review Comment: The snapshot ID should be `3L` instead of `2L`. This is snapshot 3, and using the same ID as snapshot 2 would be incorrect and could lead to confusion or incorrect behavior in the test. ```suggestion long snapshot3Id = 3L; ``` ########## fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableSnapshotJsonSerde.java: ########## @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data.lake; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.fluss.utils.json.JsonDeserializer; +import org.apache.fluss.utils.json.JsonSerdeUtils; +import org.apache.fluss.utils.json.JsonSerializer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Json serializer and deserializer for {@link LakeTableSnapshot}. + * + * <p>This serde supports two storage format versions: + * + * <ul> + * <li>Version 1 (legacy): Each bucket object contains full information including repeated + * partition names and partition_id in each bucket entry. + * <li>Version 2 (current): Compact format that optimizes layout to avoid duplication: + * <ul> + * <li>Extracts partition names to a top-level "partition_names" map + * <li>Groups buckets by partition_id in "buckets" to avoid repeating partition_id in each + * bucket + * <li>Non-partition table uses array format: "buckets": [...] + * <li>Partition table uses object format: "buckets": {"1": [...], "2": [...]}, "1", "2" is + * for the partition id + * </ul> + * Field names remain the same as Version 1, only the layout is optimized. + * </ul> + */ +public class LakeTableSnapshotJsonSerde + implements JsonSerializer<LakeTableSnapshot>, JsonDeserializer<LakeTableSnapshot> { + + public static final LakeTableSnapshotJsonSerde INSTANCE = new LakeTableSnapshotJsonSerde(); + + private static final String VERSION_KEY = "version"; + + private static final String SNAPSHOT_ID = "snapshot_id"; + private static final String TABLE_ID = "table_id"; + private static final String PARTITION_ID = "partition_id"; + private static final String BUCKETS = "buckets"; + private static final String BUCKET_ID = "bucket_id"; + private static final String LOG_START_OFFSET = "log_start_offset"; + private static final String LOG_END_OFFSET = "log_end_offset"; + private static final String MAX_TIMESTAMP = "max_timestamp"; + private static final String PARTITION_NAME = "partition_name"; + + // introduced in v2 + private static final String PARTITION_NAMES = "partition_names"; + private static final int VERSION_1 = 1; + private static final int VERSION_2 = 2; + private static final int CURRENT_VERSION = VERSION_2; + + @Override + public void serialize(LakeTableSnapshot lakeTableSnapshot, JsonGenerator generator) + throws IOException { + generator.writeStartObject(); + generator.writeNumberField(VERSION_KEY, CURRENT_VERSION); + generator.writeNumberField(SNAPSHOT_ID, lakeTableSnapshot.getSnapshotId()); + generator.writeNumberField(TABLE_ID, lakeTableSnapshot.getTableId()); + + // Extract partition names to top-level map to avoid duplication + Map<Long, String> partitionNameIdByPartitionId = + lakeTableSnapshot.getPartitionNameIdByPartitionId(); + if (!partitionNameIdByPartitionId.isEmpty()) { + generator.writeObjectFieldStart(PARTITION_NAMES); + for (Map.Entry<Long, String> entry : partitionNameIdByPartitionId.entrySet()) { + generator.writeStringField(String.valueOf(entry.getKey()), entry.getValue()); + } + generator.writeEndObject(); + } + + // Group buckets by partition_id to avoid repeating partition_id in each bucket + Map<Long, List<TableBucket>> partitionBuckets = new HashMap<>(); + List<TableBucket> nonPartitionBuckets = new ArrayList<>(); + + for (TableBucket tableBucket : lakeTableSnapshot.getBucketLogEndOffset().keySet()) { + if (tableBucket.getPartitionId() != null) { + partitionBuckets + .computeIfAbsent(tableBucket.getPartitionId(), k -> new ArrayList<>()) + .add(tableBucket); + } else { + nonPartitionBuckets.add(tableBucket); + } + } + + // Serialize buckets: use array for non-partition buckets, object for partition buckets + if (!nonPartitionBuckets.isEmpty() || !partitionBuckets.isEmpty()) { + if (!partitionBuckets.isEmpty()) { + generator.writeObjectFieldStart(BUCKETS); + for (Map.Entry<Long, java.util.List<TableBucket>> entry : + partitionBuckets.entrySet()) { + // Partition table: grouped by partition_id, first write partition + generator.writeArrayFieldStart(String.valueOf(entry.getKey())); + for (TableBucket tableBucket : entry.getValue()) { + // write bucket + writeBucketObject(generator, lakeTableSnapshot, tableBucket); + } + generator.writeEndArray(); + } + generator.writeEndObject(); + } else { + // Non-partition table: use array format directly + generator.writeArrayFieldStart(BUCKETS); + for (TableBucket tableBucket : nonPartitionBuckets) { + writeBucketObject(generator, lakeTableSnapshot, tableBucket); + } + generator.writeEndArray(); + } + } + + generator.writeEndObject(); + } + + /** Helper method to write a bucket object. */ + private void writeBucketObject( + JsonGenerator generator, LakeTableSnapshot lakeTableSnapshot, TableBucket tableBucket) + throws IOException { + generator.writeStartObject(); + + generator.writeNumberField(BUCKET_ID, tableBucket.getBucket()); + + // Only include non-null values + if (lakeTableSnapshot.getLogStartOffset(tableBucket).isPresent()) { + generator.writeNumberField( + LOG_START_OFFSET, lakeTableSnapshot.getLogStartOffset(tableBucket).get()); + } + + if (lakeTableSnapshot.getLogEndOffset(tableBucket).isPresent()) { + generator.writeNumberField( + LOG_END_OFFSET, lakeTableSnapshot.getLogEndOffset(tableBucket).get()); + } + + if (lakeTableSnapshot.getMaxTimestamp(tableBucket).isPresent()) { + generator.writeNumberField( + MAX_TIMESTAMP, lakeTableSnapshot.getMaxTimestamp(tableBucket).get()); + } + + generator.writeEndObject(); + } + + @Override + public LakeTableSnapshot deserialize(JsonNode node) { + int version = node.get(VERSION_KEY).asInt(); + if (version == VERSION_1) { + return deserializeVersion1(node); + } else if (version == VERSION_2) { + return deserializeVersion2(node); + } else { + throw new IllegalArgumentException("Unsupported version: " + version); + } + } + + /** Deserialize Version 1 format (legacy). */ + private LakeTableSnapshot deserializeVersion1(JsonNode node) { + long snapshotId = node.get(SNAPSHOT_ID).asLong(); + long tableId = node.get(TABLE_ID).asLong(); + Iterator<JsonNode> buckets = node.get(BUCKETS).elements(); + Map<TableBucket, Long> bucketLogStartOffset = new HashMap<>(); + Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>(); + Map<TableBucket, Long> bucketMaxTimestamp = new HashMap<>(); + Map<Long, String> partitionNameIdByPartitionId = new HashMap<>(); + while (buckets.hasNext()) { + JsonNode bucket = buckets.next(); + TableBucket tableBucket; + Long partitionId = + bucket.get(PARTITION_ID) != null ? bucket.get(PARTITION_ID).asLong() : null; + tableBucket = new TableBucket(tableId, partitionId, bucket.get(BUCKET_ID).asInt()); + + if (bucket.get(LOG_START_OFFSET) != null) { + bucketLogStartOffset.put(tableBucket, bucket.get(LOG_START_OFFSET).asLong()); + } else { + bucketLogStartOffset.put(tableBucket, null); + } + + if (bucket.get(LOG_END_OFFSET) != null) { + bucketLogEndOffset.put(tableBucket, bucket.get(LOG_END_OFFSET).asLong()); + } else { + bucketLogEndOffset.put(tableBucket, null); + } + + if (bucket.get(MAX_TIMESTAMP) != null) { + bucketMaxTimestamp.put(tableBucket, bucket.get(MAX_TIMESTAMP).asLong()); + } else { + bucketMaxTimestamp.put(tableBucket, null); + } + + if (partitionId != null && bucket.get(PARTITION_NAME) != null) { + partitionNameIdByPartitionId.put( + tableBucket.getPartitionId(), bucket.get(PARTITION_NAME).asText()); + } + } + return new LakeTableSnapshot( + snapshotId, + tableId, + bucketLogStartOffset, + bucketLogEndOffset, + bucketMaxTimestamp, + partitionNameIdByPartitionId); + } + + /** Deserialize Version 2 format (compact layout). */ + private LakeTableSnapshot deserializeVersion2(JsonNode node) { + long snapshotId = node.get(SNAPSHOT_ID).asLong(); + long tableId = node.get(TABLE_ID).asLong(); + + // Load partition names from top-level map + Map<Long, String> partitionNameIdByPartitionId = new HashMap<>(); + if (node.has(PARTITION_NAMES) && node.get(PARTITION_NAMES) != null) { + JsonNode partitionsNode = node.get(PARTITION_NAMES); + Iterator<Map.Entry<String, JsonNode>> partitions = partitionsNode.fields(); + while (partitions.hasNext()) { + Map.Entry<String, JsonNode> entry = partitions.next(); + Long partitionId = Long.parseLong(entry.getKey()); Review Comment: Potential uncaught 'java.lang.NumberFormatException'. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
