wuchong commented on code in PR #2037:
URL: https://github.com/apache/fluss/pull/2037#discussion_r2579832168
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:
##########
@@ -1146,50 +1172,52 @@ private <T> void
processAccessContext(AccessContextEvent<T> event) {
}
}
- private CommitLakeTableSnapshotResponse tryProcessCommitLakeTableSnapshot(
- CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent) {
+ private void tryProcessCommitLakeTableSnapshot(
+ CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent,
+ CompletableFuture<CommitLakeTableSnapshotResponse> callback) {
+ // commit the lake table snapshot asynchronously
CommitLakeTableSnapshotData commitLakeTableSnapshotData =
commitLakeTableSnapshotEvent.getCommitLakeTableSnapshotData();
- CommitLakeTableSnapshotResponse response = new
CommitLakeTableSnapshotResponse();
Map<Long, LakeTableSnapshot> lakeTableSnapshots =
commitLakeTableSnapshotData.getLakeTableSnapshot();
- for (Map.Entry<Long, LakeTableSnapshot> lakeTableSnapshotEntry :
- lakeTableSnapshots.entrySet()) {
- Long tableId = lakeTableSnapshotEntry.getKey();
-
- PbCommitLakeTableSnapshotRespForTable tableResp =
response.addTableResp();
- tableResp.setTableId(tableId);
-
- try {
- zooKeeperClient.upsertLakeTableSnapshot(tableId,
lakeTableSnapshotEntry.getValue());
- } catch (Exception e) {
- ApiError error = ApiError.fromThrowable(e);
- tableResp.setError(error.error().code(), error.message());
- }
- }
+ ioExecutor.execute(
+ () -> {
+ try {
+ CommitLakeTableSnapshotResponse response =
+ new CommitLakeTableSnapshotResponse();
+ for (Map.Entry<Long, LakeTableSnapshot>
lakeTableSnapshotEntry :
+ lakeTableSnapshots.entrySet()) {
+ Long tableId = lakeTableSnapshotEntry.getKey();
+
+ PbCommitLakeTableSnapshotRespForTable tableResp =
+ response.addTableResp();
+ tableResp.setTableId(tableId);
+
+ try {
+ TablePath tablePath =
coordinatorContext.getTablePathById(tableId);
+ if (tablePath == null) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to find table path
for table id: %d",
+ tableId));
+ }
+ // this involves IO operation (ZK), so we do
it in ioExecutor
Review Comment:
Extract the remote file writing out of the
`zooKeeperClient.upsertLakeTableSnapshot` method. This is very confusing that a
ZK operation contains a heavy remote file IO and we can't see it from method
name and the above comment. The ZK method should only take a remote path, and
record it into ZK.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:
##########
@@ -1146,50 +1172,52 @@ private <T> void
processAccessContext(AccessContextEvent<T> event) {
}
}
- private CommitLakeTableSnapshotResponse tryProcessCommitLakeTableSnapshot(
- CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent) {
+ private void tryProcessCommitLakeTableSnapshot(
+ CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent,
+ CompletableFuture<CommitLakeTableSnapshotResponse> callback) {
+ // commit the lake table snapshot asynchronously
CommitLakeTableSnapshotData commitLakeTableSnapshotData =
commitLakeTableSnapshotEvent.getCommitLakeTableSnapshotData();
- CommitLakeTableSnapshotResponse response = new
CommitLakeTableSnapshotResponse();
Map<Long, LakeTableSnapshot> lakeTableSnapshots =
commitLakeTableSnapshotData.getLakeTableSnapshot();
- for (Map.Entry<Long, LakeTableSnapshot> lakeTableSnapshotEntry :
- lakeTableSnapshots.entrySet()) {
- Long tableId = lakeTableSnapshotEntry.getKey();
-
- PbCommitLakeTableSnapshotRespForTable tableResp =
response.addTableResp();
- tableResp.setTableId(tableId);
-
- try {
- zooKeeperClient.upsertLakeTableSnapshot(tableId,
lakeTableSnapshotEntry.getValue());
- } catch (Exception e) {
- ApiError error = ApiError.fromThrowable(e);
- tableResp.setError(error.error().code(), error.message());
- }
- }
+ ioExecutor.execute(
+ () -> {
+ try {
+ CommitLakeTableSnapshotResponse response =
+ new CommitLakeTableSnapshotResponse();
+ for (Map.Entry<Long, LakeTableSnapshot>
lakeTableSnapshotEntry :
+ lakeTableSnapshots.entrySet()) {
+ Long tableId = lakeTableSnapshotEntry.getKey();
+
+ PbCommitLakeTableSnapshotRespForTable tableResp =
+ response.addTableResp();
+ tableResp.setTableId(tableId);
+
+ try {
+ TablePath tablePath =
coordinatorContext.getTablePathById(tableId);
Review Comment:
`coordinatorContext` is not thread-safe, and can't be accessed in other
threads. We should get the path before `ioExecutor`.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:
##########
@@ -1146,50 +1172,52 @@ private <T> void
processAccessContext(AccessContextEvent<T> event) {
}
}
- private CommitLakeTableSnapshotResponse tryProcessCommitLakeTableSnapshot(
- CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent) {
+ private void tryProcessCommitLakeTableSnapshot(
+ CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent,
+ CompletableFuture<CommitLakeTableSnapshotResponse> callback) {
+ // commit the lake table snapshot asynchronously
CommitLakeTableSnapshotData commitLakeTableSnapshotData =
commitLakeTableSnapshotEvent.getCommitLakeTableSnapshotData();
- CommitLakeTableSnapshotResponse response = new
CommitLakeTableSnapshotResponse();
Map<Long, LakeTableSnapshot> lakeTableSnapshots =
commitLakeTableSnapshotData.getLakeTableSnapshot();
- for (Map.Entry<Long, LakeTableSnapshot> lakeTableSnapshotEntry :
- lakeTableSnapshots.entrySet()) {
- Long tableId = lakeTableSnapshotEntry.getKey();
-
- PbCommitLakeTableSnapshotRespForTable tableResp =
response.addTableResp();
- tableResp.setTableId(tableId);
-
- try {
- zooKeeperClient.upsertLakeTableSnapshot(tableId,
lakeTableSnapshotEntry.getValue());
- } catch (Exception e) {
- ApiError error = ApiError.fromThrowable(e);
- tableResp.setError(error.error().code(), error.message());
- }
- }
+ ioExecutor.execute(
+ () -> {
+ try {
+ CommitLakeTableSnapshotResponse response =
+ new CommitLakeTableSnapshotResponse();
+ for (Map.Entry<Long, LakeTableSnapshot>
lakeTableSnapshotEntry :
+ lakeTableSnapshots.entrySet()) {
+ Long tableId = lakeTableSnapshotEntry.getKey();
+
+ PbCommitLakeTableSnapshotRespForTable tableResp =
+ response.addTableResp();
+ tableResp.setTableId(tableId);
+
+ try {
+ TablePath tablePath =
coordinatorContext.getTablePathById(tableId);
+ if (tablePath == null) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to find table path
for table id: %d",
+ tableId));
+ }
+ // this involves IO operation (ZK), so we do
it in ioExecutor
+ zooKeeperClient.upsertLakeTableSnapshot(
+ tableId, tablePath,
lakeTableSnapshotEntry.getValue());
+ } catch (Exception e) {
+ ApiError error = ApiError.fromThrowable(e);
+ tableResp.setError(error.error().code(),
error.message());
+ }
+ }
- // send notify lakehouse data request to all replicas.
- coordinatorRequestBatch.newBatch();
- for (Map.Entry<Long, LakeTableSnapshot> lakeTableSnapshotEntry :
- lakeTableSnapshots.entrySet()) {
- LakeTableSnapshot lakeTableSnapshot =
lakeTableSnapshotEntry.getValue();
- for (Map.Entry<TableBucket, Long> bucketLogEndOffsetEntry :
- lakeTableSnapshot.getBucketLogEndOffset().entrySet()) {
- TableBucket tb = bucketLogEndOffsetEntry.getKey();
- coordinatorContext
-
.getBucketLeaderAndIsr(bucketLogEndOffsetEntry.getKey())
- .ifPresent(
- leaderAndIsr ->
- coordinatorRequestBatch
-
.addNotifyLakeTableOffsetRequestForTableServers(
-
coordinatorContext.getAssignment(tb),
- tb,
- lakeTableSnapshot));
- }
- }
- coordinatorRequestBatch.sendNotifyLakeTableOffsetRequest(
- coordinatorContext.getCoordinatorEpoch());
- return response;
+ // send notify lakehouse data request to all replicas
via coordinator event
+ coordinatorEventManager.put(
+ new
NotifyLakeTableOffsetEvent(lakeTableSnapshots));
Review Comment:
This may introduce out-of-order processing of `NotifyLakeTableOffsetEvent`.
We may need to add idempotent check for the lake snapshot offsets, like check
the input snapshot id is larger than the current `lakeTableSnapshotId`, and
then we can safely apply the log offsets.
##########
fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java:
##########
@@ -576,4 +584,117 @@ void testZookeeperConfigPath() throws Exception {
.isEqualTo("zookeeper2");
}
}
+
+ @Test
+ void testUpsertLakeTableSnapshotCompatible(@TempDir Path tempDir) throws
Exception {
+ // Create a ZooKeeperClient with REMOTE_DATA_DIR configuration
+ Configuration conf = new Configuration();
+ conf.setString(
+ ConfigOptions.ZOOKEEPER_ADDRESS,
+
ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().getConnectString());
+ conf.set(ConfigOptions.REMOTE_DATA_DIR,
tempDir.toAbsolutePath().toString());
+ try (ZooKeeperClient zooKeeperClient =
+ ZooKeeperUtils.startZookeeperClient(conf,
NOPErrorHandler.INSTANCE)) {
+ // first create a table
+ long tableId = 1;
+ TablePath tablePath = TablePath.of("test_db", "test_table");
+ TableRegistration tableReg =
+ new TableRegistration(
+ tableId,
+ "test table",
+ Collections.emptyList(),
+ new TableDescriptor.TableDistribution(
+ 1, Collections.singletonList("a")),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ System.currentTimeMillis(),
+ System.currentTimeMillis());
+ zookeeperClient.registerTable(tablePath, tableReg);
+
+ // Create a legacy version 1 LakeTableSnapshot (full data in ZK)
+ long snapshotId = 1L;
+ Map<TableBucket, Long> bucketLogStartOffset = new HashMap<>();
+ bucketLogStartOffset.put(new TableBucket(tableId, 0), 10L);
+ bucketLogStartOffset.put(new TableBucket(tableId, 1), 20L);
+
+ Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+ bucketLogEndOffset.put(new TableBucket(tableId, 0), 100L);
+ bucketLogEndOffset.put(new TableBucket(tableId, 1), 200L);
+
+ Map<TableBucket, Long> bucketMaxTimestamp = new HashMap<>();
+ bucketMaxTimestamp.put(new TableBucket(tableId, 0), 1000L);
+ bucketMaxTimestamp.put(new TableBucket(tableId, 1), 2000L);
+
+ Map<Long, String> partitionNameIdByPartitionId = new HashMap<>();
+ LakeTableSnapshot lakeTableSnapshot =
+ new LakeTableSnapshot(
+ snapshotId,
+ tableId,
+ bucketLogStartOffset,
+ bucketLogEndOffset,
+ bucketMaxTimestamp,
+ partitionNameIdByPartitionId);
+ // Write version 1 format data directly to ZK (simulating old
system behavior)
+ String zkPath = LakeTableZNode.path(tableId);
+ byte[] version1Data =
LakeTableSnapshotJsonSerde.toJson(lakeTableSnapshot);
+ zooKeeperClient
+ .getCuratorClient()
+ .create()
+ .creatingParentsIfNeeded()
+ .forPath(zkPath, version1Data);
+
+ // Verify version 1 data can be read
+ Optional<LakeTable> optionalLakeTable =
zooKeeperClient.getLakeTable(tableId);
+ assertThat(optionalLakeTable).isPresent();
+ LakeTable lakeTable = optionalLakeTable.get();
+
assertThat(lakeTable.toLakeTableSnapshot()).isEqualTo(lakeTableSnapshot);
+
+ // Test: Call upsertLakeTableSnapshot with new snapshot data
+ // This should read the old version 1 data, merge it, and write as
version 2
+ Map<TableBucket, Long> newBucketLogEndOffset = new HashMap<>();
+ newBucketLogEndOffset.put(new TableBucket(tableId, 0), 1500L); //
Updated offset
+ newBucketLogEndOffset.put(new TableBucket(tableId, 1), 2000L); //
new offset
+
+ long newSnapshotId = 2L;
+ LakeTableSnapshot newSnapshot =
+ new LakeTableSnapshot(
+ newSnapshotId,
+ tableId,
+ Collections.emptyMap(),
+ newBucketLogEndOffset,
+ Collections.emptyMap(),
+ Collections.emptyMap());
+ zooKeeperClient.upsertLakeTableSnapshot(tableId, tablePath,
newSnapshot);
+
+ // Verify: New version 2 data can be read
+ Optional<LakeTable> optLakeTableAfter =
zooKeeperClient.getLakeTable(tableId);
+ assertThat(optLakeTableAfter).isPresent();
+ LakeTable lakeTableAfter = optLakeTableAfter.get();
+ assertThat(lakeTableAfter.getLakeTableSnapshotFileHandle())
+ .isNotNull(); // Version 2 has file path
+
+ // Verify: The lake snapshot file exists
+ FsPath metadataPath =
lakeTableAfter.getLakeTableSnapshotFileHandle();
+ FileSystem fileSystem = metadataPath.getFileSystem();
+ assertThat(fileSystem.exists(metadataPath)).isTrue();
+
+ Optional<LakeTableSnapshot> optMergedSnapshot =
+ zooKeeperClient.getLakeTableSnapshot(tableId);
+ assertThat(optMergedSnapshot).isPresent();
+ LakeTableSnapshot mergedSnapshot = optMergedSnapshot.get();
+
+ // verify the snapshot should merge previous snapshot
+
assertThat(mergedSnapshot.getSnapshotId()).isEqualTo(newSnapshotId);
+ assertThat(mergedSnapshot.getTableId()).isEqualTo(tableId);
+
assertThat(mergedSnapshot.getBucketLogStartOffset()).isEqualTo(bucketLogStartOffset);
+
+ Map<TableBucket, Long> expectedBucketLogEndOffset = new
HashMap<>(bucketLogEndOffset);
+ expectedBucketLogEndOffset.putAll(newBucketLogEndOffset);
+ assertThat(mergedSnapshot.getBucketLogEndOffset())
+ .isEqualTo(expectedBucketLogEndOffset);
+
assertThat(mergedSnapshot.getBucketMaxTimestamp()).isEqualTo(bucketMaxTimestamp);
+ assertThat(mergedSnapshot.getPartitionNameIdByPartitionId())
+ .isEqualTo(partitionNameIdByPartitionId);
Review Comment:
Add one more snapshot and verify the previous remove snapshot metadata file
is successfully deleted.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableJsonSerde.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lake;
+
+import org.apache.fluss.fs.FsPath;
+import
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.fluss.utils.json.JsonDeserializer;
+import org.apache.fluss.utils.json.JsonSerializer;
+
+import java.io.IOException;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * Json serializer and deserializer for {@link LakeTable}.
+ *
+ * <p>This serde supports two storage format versions:
+ *
+ * <ul>
+ * <li>Version 1 (legacy): ZK node contains full {@link LakeTableSnapshot}
data. During
+ * deserialization, it uses {@link LakeTableSnapshotJsonSerde} to
deserialize and wraps the
+ * result in a {@link LakeTable}.
+ * <li>Version 2 (current): ZK node contains only the metadata file path.
The actual snapshot data
+ * is stored in a remote file.
+ * </ul>
+ */
+public class LakeTableJsonSerde implements JsonSerializer<LakeTable>,
JsonDeserializer<LakeTable> {
+
+ public static final LakeTableJsonSerde INSTANCE = new LakeTableJsonSerde();
+
+ private static final String VERSION_KEY = "version";
+ private static final String METADATA_PATH_KEY = "metadata_path";
+ private static final int VERSION_1 = 1;
+ private static final int VERSION_2 = 2;
+ private static final int CURRENT_VERSION = VERSION_2;
+
+ @Override
+ public void serialize(LakeTable lakeTable, JsonGenerator generator) throws
IOException {
+ generator.writeStartObject();
+ generator.writeNumberField(VERSION_KEY, CURRENT_VERSION);
+
+ FsPath lakeTableSnapshotFileHandle =
lakeTable.getLakeTableSnapshotFileHandle();
+ checkNotNull(lakeTableSnapshotFileHandle);
+ generator.writeStringField(METADATA_PATH_KEY,
lakeTableSnapshotFileHandle.toString());
+ generator.writeEndObject();
+ }
+
+ @Override
+ public LakeTable deserialize(JsonNode node) {
+ int version = node.get(VERSION_KEY).asInt();
+ if (version == VERSION_1) {
+ // Version 1: ZK node contains full snapshot data, use
LakeTableSnapshotJsonSerde
+ LakeTableSnapshot snapshot =
LakeTableSnapshotJsonSerde.INSTANCE.deserialize(node);
+ return new LakeTable(snapshot);
+ } else if (version == VERSION_2) {
+ // Version 2: ZK node contains only metadata file path
+ if (!node.has(METADATA_PATH_KEY) ||
node.get(METADATA_PATH_KEY).isNull()) {
+ throw new IllegalArgumentException(
+ "Version 2 ZK node must have non-null 'metadata_path'
field");
Review Comment:
add the json node string into the exception for easier debugging?
##########
fluss-common/src/main/java/org/apache/fluss/utils/FlussPaths.java:
##########
@@ -681,6 +683,28 @@ public static FsPath remoteKvSnapshotDir(FsPath
remoteKvTabletDir, long snapshot
return new FsPath(remoteKvTabletDir, REMOTE_KV_SNAPSHOT_DIR_PREFIX +
snapshotId);
}
+ /**
+ * Returns the remote path for storing lake snapshot metadata required by
Fluss for a table.
+ *
+ * <p>The path contract:
+ *
+ * <pre>
+ *
{$remote.data.dir}/lake/{databaseName}/{tableName}-{tableId}/snapshot/{snapshotId}
Review Comment:
Better to use `{snapshotId}.snapshot` as the file name to distinguish the
usage of this file, we may have more files under this directory.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]