wuchong commented on code in PR #2223:
URL: https://github.com/apache/fluss/pull/2223#discussion_r2647869648
##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -947,11 +949,53 @@ message PbModifyColumn {
optional int32 column_position_type = 4; // LAST=0,FIRST=1,AFTER=3
}
-
-
message PbDescribeConfig {
required string config_key = 1;
optional string config_value = 2;
required string config_source = 3;
}
+message PbLakeTableSnapshotMetadata {
+ required int64 table_id = 1;
+ required int64 snapshot_id = 2;
+ required string tiered_bucket_offsets_file_path = 3;
+ optional string readable_bucket_offsets_file_path = 4;
+}
+
+message PbLakeTableSnapshotInfo {
+ optional int64 table_id = 1;
+ required int64 snapshot_id = 2;
+ repeated PbLakeTableOffsetForBucket buckets_req = 3;
+ // add table path to reduce get table_path by table id
+ optional PbTablePath table_path = 4;
+}
+
+message PbLakeTableOffsetForBucket {
+ optional int64 partition_id = 1;
+ required int32 bucket_id = 2;
+ // Deprecated: log_start_offset is no longer used. Field number 3 is
reserved for protocol compatibility.
+ // optional int64 log_start_offset = 3;
+ optional int64 log_end_offset = 4;
+ // Deprecated: partition_name is no longer used. Field number 5 is reserved
for protocol compatibility.
+ // optional string partition_name = 5;
+ optional int64 max_timestamp = 6;
+}
+
+message PbPrepareCommitLakeTableRespForTable {
Review Comment:
add a `table_id` field, as the `PrepareCommitLakeTableSnapshotRequest`
request has multiple table ids, we need to distinguish which table is the
`PbPrepareCommitLakeTableRespForTable` belong to.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:
##########
@@ -1224,6 +1225,48 @@ private <T> void
processAccessContext(AccessContextEvent<T> event) {
private void tryProcessCommitLakeTableSnapshot(
CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent,
CompletableFuture<CommitLakeTableSnapshotResponse> callback) {
+ CommitLakeTableSnapshotData commitLakeTableSnapshotData =
+ commitLakeTableSnapshotEvent.getCommitLakeTableSnapshotData();
+ if
(commitLakeTableSnapshotData.getLakeTableSnapshotMetadatas().isEmpty()) {
+ handleCommitLakeTableSnapshotV1(commitLakeTableSnapshotEvent,
callback);
+ } else {
+ Map<Long, LakeTable.LakeSnapshotMetadata> lakeSnapshotMetadatas =
Review Comment:
extract the else clode block into a `handleCommitLakeTableSnapshotV2()`
method
##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -947,11 +949,53 @@ message PbModifyColumn {
optional int32 column_position_type = 4; // LAST=0,FIRST=1,AFTER=3
}
-
-
message PbDescribeConfig {
required string config_key = 1;
optional string config_value = 2;
required string config_source = 3;
}
+message PbLakeTableSnapshotMetadata {
+ required int64 table_id = 1;
+ required int64 snapshot_id = 2;
+ required string tiered_bucket_offsets_file_path = 3;
+ optional string readable_bucket_offsets_file_path = 4;
+}
+
+message PbLakeTableSnapshotInfo {
+ optional int64 table_id = 1;
+ required int64 snapshot_id = 2;
+ repeated PbLakeTableOffsetForBucket buckets_req = 3;
+ // add table path to reduce get table_path by table id
+ optional PbTablePath table_path = 4;
+}
+
+message PbLakeTableOffsetForBucket {
+ optional int64 partition_id = 1;
+ required int32 bucket_id = 2;
+ // Deprecated: log_start_offset is no longer used. Field number 3 is
reserved for protocol compatibility.
+ // optional int64 log_start_offset = 3;
+ optional int64 log_end_offset = 4;
+ // Deprecated: partition_name is no longer used. Field number 5 is reserved
for protocol compatibility.
+ // optional string partition_name = 5;
+ optional int64 max_timestamp = 6;
+}
+
+message PbPrepareCommitLakeTableRespForTable {
+ optional string lake_table_bucket_offsets_path = 1;
+ optional int32 error_code = 2;
+ optional string error_message = 3;
+}
+
+message PbTableBucketOffsets {
Review Comment:
rename to `PbTableOffsets`? This makes it more clear it is a set of offsets
for a table.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java:
##########
@@ -44,99 +46,96 @@ public LakeTableHelper(ZooKeeperClient zkClient, String
remoteDataDir) {
}
/**
- * Upserts a lake table snapshot for the given table.
- *
- * <p>This method merges the new snapshot with the existing one (if any)
and stores it (data in
- * remote file, the remote file path in ZK).
+ * Upserts a lake table snapshot for the given table, stored in v1 format.
Note: this method is
+ * just for back compatibility.
*
* @param tableId the table ID
- * @param tablePath the table path
* @param lakeTableSnapshot the new snapshot to upsert
* @throws Exception if the operation fails
*/
- public void upsertLakeTable(
- long tableId, TablePath tablePath, LakeTableSnapshot
lakeTableSnapshot)
+ public void upsertLakeTable(long tableId, LakeTableSnapshot
lakeTableSnapshot)
throws Exception {
Optional<LakeTable> optPreviousLakeTable =
zkClient.getLakeTable(tableId);
// Merge with previous snapshot if exists
if (optPreviousLakeTable.isPresent()) {
- lakeTableSnapshot =
- mergeLakeTable(
-
optPreviousLakeTable.get().getLatestTableSnapshot(), lakeTableSnapshot);
+ TableBucketOffsets tableBucketOffsets =
+ mergeTableBucketOffsets(
+ optPreviousLakeTable.get(),
+ new TableBucketOffsets(
+ tableId,
lakeTableSnapshot.getBucketLogEndOffset()));
+ lakeTableSnapshot = new LakeTableSnapshot(tableId,
tableBucketOffsets.getOffsets());
}
+ zkClient.upsertLakeTable(
+ tableId, new LakeTable(lakeTableSnapshot),
optPreviousLakeTable.isPresent());
+ }
- // store the lake table snapshot into a file
- FsPath lakeTableSnapshotFsPath =
- storeLakeTableSnapshot(tableId, tablePath, lakeTableSnapshot);
-
- LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata =
- new LakeTable.LakeSnapshotMetadata(
- lakeTableSnapshot.getSnapshotId(),
- // use the lake table snapshot file as the tiered
offsets file since
- // the table snapshot file will contain the tiered log
end offsets
- lakeTableSnapshotFsPath,
- // currently, readableOffsetsFilePath is always same
with
- // tieredOffsetsFilePath, but in the future we'll
commit a readable offsets
- // separately to mark what the readable offsets are
for a snapshot since
- // in paimon dv table, tiered log end offsets is not
same with readable
- // offsets
- lakeTableSnapshotFsPath);
-
- // currently, we keep only one lake snapshot metadata in zk,
- // todo: in solve paimon dv union read issue #2121, we'll keep
multiple lake snapshot
- // metadata
+ public void addLakeTableSnapshotMetadata(
+ long tableId, LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata)
throws Exception {
+ Optional<LakeTable> optPreviousLakeTable =
zkClient.getLakeTable(tableId);
+ List<LakeTable.LakeSnapshotMetadata> previousLakeSnapshotMetadatas =
null;
+ if (optPreviousLakeTable.isPresent()) {
+ previousLakeSnapshotMetadatas =
optPreviousLakeTable.get().getLakeSnapshotMetadatas();
+ }
LakeTable lakeTable = new LakeTable(lakeSnapshotMetadata);
try {
zkClient.upsertLakeTable(tableId, lakeTable,
optPreviousLakeTable.isPresent());
} catch (Exception e) {
LOG.warn("Failed to upsert lake table snapshot to zk.", e);
- // discard the new lake snapshot metadata
- lakeSnapshotMetadata.discard();
throw e;
}
+ // currently, we keep only one lake snapshot metadata in zk,
+ // todo: in solve paimon dv union read issue #2121, we'll keep
multiple lake snapshot
+ // metadata
+ // discard previous lake snapshot metadata
+ if (previousLakeSnapshotMetadatas != null) {
+
previousLakeSnapshotMetadatas.forEach(LakeTable.LakeSnapshotMetadata::discard);
+ }
+ }
+
+ public TableBucketOffsets upsertTableBucketOffsets(
+ long tableId, TableBucketOffsets newTableBucketOffsets) throws
Exception {
+ Optional<LakeTable> optPreviousLakeTable =
zkClient.getLakeTable(tableId);
+ // Merge with previous snapshot if exists
if (optPreviousLakeTable.isPresent()) {
- // discard previous latest lake snapshot
- LakeTable.LakeSnapshotMetadata previousLakeSnapshotMetadata =
- optPreviousLakeTable.get().getLatestLakeSnapshotMetadata();
- if (previousLakeSnapshotMetadata != null) {
- previousLakeSnapshotMetadata.discard();
- }
+ return mergeTableBucketOffsets(optPreviousLakeTable.get(),
newTableBucketOffsets);
}
+ return newTableBucketOffsets;
}
- private LakeTableSnapshot mergeLakeTable(
- LakeTableSnapshot previousLakeTableSnapshot, LakeTableSnapshot
newLakeTableSnapshot) {
- // Merge current snapshot with previous one since the current snapshot
request
+ private TableBucketOffsets mergeTableBucketOffsets(
+ LakeTable previousLakeTable, TableBucketOffsets
newTableBucketOffsets)
+ throws Exception {
+ // Merge current with previous one since the current request
// may not carry all buckets for the table. It typically only carries
buckets
// that were written after the previous commit.
// merge log end offsets, current will override the previous
Map<TableBucket, Long> bucketLogEndOffset =
- new
HashMap<>(previousLakeTableSnapshot.getBucketLogEndOffset());
-
bucketLogEndOffset.putAll(newLakeTableSnapshot.getBucketLogEndOffset());
-
- return new LakeTableSnapshot(newLakeTableSnapshot.getSnapshotId(),
bucketLogEndOffset);
+ new
HashMap<>(previousLakeTable.getLatestTableSnapshot().getBucketLogEndOffset());
+ bucketLogEndOffset.putAll(newTableBucketOffsets.getOffsets());
+ return new TableBucketOffsets(newTableBucketOffsets.getTableId(),
bucketLogEndOffset);
}
- private FsPath storeLakeTableSnapshot(
- long tableId, TablePath tablePath, LakeTableSnapshot
lakeTableSnapshot)
- throws Exception {
- // get the remote file path to store the lake table snapshot
information
- FsPath remoteLakeTableSnapshotManifestPath =
- FlussPaths.remoteLakeTableSnapshotManifestPath(remoteDataDir,
tablePath, tableId);
+ public FsPath storeLakeTableBucketOffsets(
Review Comment:
`storeLakeTableOffsetsFile` to make it clear it stores the offsets to remote
files.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:
##########
@@ -1224,6 +1225,48 @@ private <T> void
processAccessContext(AccessContextEvent<T> event) {
private void tryProcessCommitLakeTableSnapshot(
CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent,
CompletableFuture<CommitLakeTableSnapshotResponse> callback) {
+ CommitLakeTableSnapshotData commitLakeTableSnapshotData =
+ commitLakeTableSnapshotEvent.getCommitLakeTableSnapshotData();
+ if
(commitLakeTableSnapshotData.getLakeTableSnapshotMetadatas().isEmpty()) {
+ handleCommitLakeTableSnapshotV1(commitLakeTableSnapshotEvent,
callback);
+ } else {
+ Map<Long, LakeTable.LakeSnapshotMetadata> lakeSnapshotMetadatas =
+
commitLakeTableSnapshotData.getLakeTableSnapshotMetadatas();
+ ioExecutor.execute(
+ () -> {
+ try {
+ CommitLakeTableSnapshotResponse response =
+ new CommitLakeTableSnapshotResponse();
+ for (Map.Entry<Long,
LakeTable.LakeSnapshotMetadata>
+ lakeSnapshotMetadataEntry :
lakeSnapshotMetadatas.entrySet()) {
+ PbCommitLakeTableSnapshotRespForTable
tableResp =
+ response.addTableResp();
+ long tableId =
lakeSnapshotMetadataEntry.getKey();
+ tableResp.setTableId(tableId);
+ try {
+
lakeTableHelper.addLakeTableSnapshotMetadata(
+ tableId,
lakeSnapshotMetadataEntry.getValue());
+ } catch (Exception e) {
+ ApiError error = ApiError.fromThrowable(e);
+ tableResp.setError(error.error().code(),
error.message());
+ }
+ }
+ coordinatorEventManager.put(
+ new NotifyLakeTableOffsetEvent(
+
commitLakeTableSnapshotData.getLakeTableSnapshot(),
+ commitLakeTableSnapshotData
+
.getTableBucketsMaxTieredTimestamp()));
Review Comment:
We should move the "putting notify event" into the try block. Otherwise,
when the zk register fails, we shouldn't notify the event.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:
##########
@@ -1224,6 +1225,48 @@ private <T> void
processAccessContext(AccessContextEvent<T> event) {
private void tryProcessCommitLakeTableSnapshot(
CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent,
CompletableFuture<CommitLakeTableSnapshotResponse> callback) {
+ CommitLakeTableSnapshotData commitLakeTableSnapshotData =
+ commitLakeTableSnapshotEvent.getCommitLakeTableSnapshotData();
+ if
(commitLakeTableSnapshotData.getLakeTableSnapshotMetadatas().isEmpty()) {
+ handleCommitLakeTableSnapshotV1(commitLakeTableSnapshotEvent,
callback);
+ } else {
+ Map<Long, LakeTable.LakeSnapshotMetadata> lakeSnapshotMetadatas =
+
commitLakeTableSnapshotData.getLakeTableSnapshotMetadatas();
+ ioExecutor.execute(
+ () -> {
+ try {
+ CommitLakeTableSnapshotResponse response =
+ new CommitLakeTableSnapshotResponse();
+ for (Map.Entry<Long,
LakeTable.LakeSnapshotMetadata>
+ lakeSnapshotMetadataEntry :
lakeSnapshotMetadatas.entrySet()) {
+ PbCommitLakeTableSnapshotRespForTable
tableResp =
+ response.addTableResp();
+ long tableId =
lakeSnapshotMetadataEntry.getKey();
+ tableResp.setTableId(tableId);
+ try {
+
lakeTableHelper.addLakeTableSnapshotMetadata(
+ tableId,
lakeSnapshotMetadataEntry.getValue());
+ } catch (Exception e) {
+ ApiError error = ApiError.fromThrowable(e);
+ tableResp.setError(error.error().code(),
error.message());
+ }
+ }
+ coordinatorEventManager.put(
+ new NotifyLakeTableOffsetEvent(
+
commitLakeTableSnapshotData.getLakeTableSnapshot(),
+ commitLakeTableSnapshotData
+
.getTableBucketsMaxTieredTimestamp()));
Review Comment:
We should move the "putting notify event" into the try block. Otherwise,
when the zk register fails, we shouldn't notify the event.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTable.java:
##########
@@ -112,17 +120,19 @@ public List<LakeSnapshotMetadata>
getLakeSnapshotMetadatas() {
*
* @return the LakeTableSnapshot
*/
- public LakeTableSnapshot getLatestTableSnapshot() throws Exception {
+ public LakeTableSnapshot getLatestTableSnapshot() throws IOException {
Review Comment:
Consider renaming this method to `getOrReadLatestTableSnapshot` to
**explicitly indicate that it may perform I/O operations** (e.g., reading from
remote storage) when the snapshot isn't already available in memory. This makes
the method’s behavior clearer to callers and improves code readability.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java:
##########
@@ -44,99 +46,96 @@ public LakeTableHelper(ZooKeeperClient zkClient, String
remoteDataDir) {
}
/**
- * Upserts a lake table snapshot for the given table.
- *
- * <p>This method merges the new snapshot with the existing one (if any)
and stores it (data in
- * remote file, the remote file path in ZK).
+ * Upserts a lake table snapshot for the given table, stored in v1 format.
Note: this method is
+ * just for back compatibility.
*
* @param tableId the table ID
- * @param tablePath the table path
* @param lakeTableSnapshot the new snapshot to upsert
* @throws Exception if the operation fails
*/
- public void upsertLakeTable(
- long tableId, TablePath tablePath, LakeTableSnapshot
lakeTableSnapshot)
+ public void upsertLakeTable(long tableId, LakeTableSnapshot
lakeTableSnapshot)
throws Exception {
Optional<LakeTable> optPreviousLakeTable =
zkClient.getLakeTable(tableId);
// Merge with previous snapshot if exists
if (optPreviousLakeTable.isPresent()) {
- lakeTableSnapshot =
- mergeLakeTable(
-
optPreviousLakeTable.get().getLatestTableSnapshot(), lakeTableSnapshot);
+ TableBucketOffsets tableBucketOffsets =
+ mergeTableBucketOffsets(
+ optPreviousLakeTable.get(),
+ new TableBucketOffsets(
+ tableId,
lakeTableSnapshot.getBucketLogEndOffset()));
+ lakeTableSnapshot = new LakeTableSnapshot(tableId,
tableBucketOffsets.getOffsets());
}
+ zkClient.upsertLakeTable(
+ tableId, new LakeTable(lakeTableSnapshot),
optPreviousLakeTable.isPresent());
+ }
- // store the lake table snapshot into a file
- FsPath lakeTableSnapshotFsPath =
- storeLakeTableSnapshot(tableId, tablePath, lakeTableSnapshot);
-
- LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata =
- new LakeTable.LakeSnapshotMetadata(
- lakeTableSnapshot.getSnapshotId(),
- // use the lake table snapshot file as the tiered
offsets file since
- // the table snapshot file will contain the tiered log
end offsets
- lakeTableSnapshotFsPath,
- // currently, readableOffsetsFilePath is always same
with
- // tieredOffsetsFilePath, but in the future we'll
commit a readable offsets
- // separately to mark what the readable offsets are
for a snapshot since
- // in paimon dv table, tiered log end offsets is not
same with readable
- // offsets
- lakeTableSnapshotFsPath);
-
- // currently, we keep only one lake snapshot metadata in zk,
- // todo: in solve paimon dv union read issue #2121, we'll keep
multiple lake snapshot
- // metadata
+ public void addLakeTableSnapshotMetadata(
+ long tableId, LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata)
throws Exception {
+ Optional<LakeTable> optPreviousLakeTable =
zkClient.getLakeTable(tableId);
+ List<LakeTable.LakeSnapshotMetadata> previousLakeSnapshotMetadatas =
null;
+ if (optPreviousLakeTable.isPresent()) {
+ previousLakeSnapshotMetadatas =
optPreviousLakeTable.get().getLakeSnapshotMetadatas();
+ }
LakeTable lakeTable = new LakeTable(lakeSnapshotMetadata);
try {
zkClient.upsertLakeTable(tableId, lakeTable,
optPreviousLakeTable.isPresent());
} catch (Exception e) {
LOG.warn("Failed to upsert lake table snapshot to zk.", e);
- // discard the new lake snapshot metadata
- lakeSnapshotMetadata.discard();
throw e;
}
+ // currently, we keep only one lake snapshot metadata in zk,
+ // todo: in solve paimon dv union read issue #2121, we'll keep
multiple lake snapshot
+ // metadata
+ // discard previous lake snapshot metadata
+ if (previousLakeSnapshotMetadatas != null) {
+
previousLakeSnapshotMetadatas.forEach(LakeTable.LakeSnapshotMetadata::discard);
+ }
+ }
+
+ public TableBucketOffsets upsertTableBucketOffsets(
Review Comment:
It sounds like this method will `upsert` bucket offsets into zk (very
similar to other methods like `upsertLakeTable`). However, it doesn't. Would
be better to move the logic into
`org.apache.fluss.server.RpcServiceBase#prepareCommitLakeTableSnapshot`.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableSnapshotJsonSerde.java:
##########
@@ -18,168 +18,97 @@
package org.apache.fluss.server.zk.data.lake;
-import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.metadata.TableBucket;
import
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.fluss.utils.json.JsonDeserializer;
-import org.apache.fluss.utils.json.JsonSerdeUtils;
import org.apache.fluss.utils.json.JsonSerializer;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
-import java.util.TreeMap;
-
-import static org.apache.fluss.utils.Preconditions.checkNotNull;
-import static org.apache.fluss.utils.Preconditions.checkState;
/**
* Json serializer and deserializer for {@link LakeTableSnapshot}.
*
- * <p>This serde supports two storage format versions:
+ * <p><b>Note:</b> This class is primarily used for backward compatibility to
deserialize legacy
+ * version 1 lake snapshot data stored in ZooKeeper. The current storage
format (version 2) stores
+ * only file paths in ZooKeeper, with actual snapshot data stored in remote
files. This serde is
+ * used by {@link LakeTableJsonSerde} to handle version 1 format
deserialization.
+ *
+ * <p>The version 1 format stores the full {@link LakeTableSnapshot} data
directly in the ZooKeeper
+ * node, which includes:
*
* <ul>
- * <li>Version 1 (legacy): Each bucket object contains full information
including repeated
- * partition names and partition_id in each bucket entry.
- * <li>Version 2 (current): Compact format that uses different property keys
for partitioned and
- * non-partitioned tables to simplify deserialization:
- * <ul>
- * <li>Non-partition table uses "bucket_offsets": [100, 200, 300],
where array index
- * represents bucket id (0, 1, 2) and value represents
log_end_offset. For buckets
- * without end offset, -1 is written. Missing bucket ids in the
sequence are also filled
- * with -1.
- * <li>Partition table uses "partition_bucket_offsets": {"1": [100,
200], "2": [300, 400]},
- * where key is partition id, array index represents bucket id (0,
1) and value
- * represents log_end_offset. For buckets without end offset, -1
is written. Missing
- * bucket ids in the sequence are also filled with -1.
- * </ul>
- * During deserialization, values of -1 are ignored and not added to the
bucket log end offset
- * map.
+ * <li>version: 1
+ * <li>snapshot_id: the snapshot ID
+ * <li>table_id: the table ID (derived from the first bucket)
+ * <li>buckets: array of bucket objects, each containing bucket_id, optional
partition_id, and
+ * log_end_offset
* </ul>
+ *
+ * @see LakeTableJsonSerde for the current format (version 2) that uses this
serde for legacy
+ * compatibility
*/
public class LakeTableSnapshotJsonSerde
Review Comment:
Consider to rename this to `LakeTableSnapshotLegacyJsonSerde` (Currenlty
there is too many `LakeXxxSerde`)
##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -449,25 +449,27 @@ message NotifyRemoteLogOffsetsRequest {
message NotifyRemoteLogOffsetsResponse {
}
-message CommitLakeTableSnapshotRequest {
- repeated PbLakeTableSnapshotInfo tables_req = 1;
+message PrepareCommitLakeTableSnapshotRequest {
Review Comment:
`PrepareLakeTableSnapshotRequest`
`prepare` and `commit` are 2 different phases.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableJsonSerde.java:
##########
@@ -61,28 +61,35 @@ public class LakeTableJsonSerde implements
JsonSerializer<LakeTable>, JsonDeseri
@Override
public void serialize(LakeTable lakeTable, JsonGenerator generator) throws
IOException {
- generator.writeStartObject();
- generator.writeNumberField(VERSION_KEY, CURRENT_VERSION);
-
- generator.writeArrayFieldStart(LAKE_SNAPSHOTS);
- for (LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata :
- checkNotNull(lakeTable.getLakeSnapshotMetadatas())) {
+ // if lake table snapshot metadata is null, it must be version 1
+ if (lakeTable.getLakeSnapshotMetadatas() == null) {
+ // Version 1: ZK node contains full snapshot data, use
LakeTableSnapshotJsonSerde
+ LakeTableSnapshotJsonSerde.INSTANCE.serialize(
+ lakeTable.getLatestTableSnapshot(), generator);
+ } else {
generator.writeStartObject();
+ generator.writeNumberField(VERSION_KEY, CURRENT_VERSION);
+
+ generator.writeArrayFieldStart(LAKE_SNAPSHOTS);
+ for (LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata :
Review Comment:
It would be better to extract separate `serializeV1()` and `serializeV2()`
methods to improve maintainability and readability. The same applies to the
`deserialize`.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java:
##########
@@ -44,99 +46,96 @@ public LakeTableHelper(ZooKeeperClient zkClient, String
remoteDataDir) {
}
/**
- * Upserts a lake table snapshot for the given table.
- *
- * <p>This method merges the new snapshot with the existing one (if any)
and stores it (data in
- * remote file, the remote file path in ZK).
+ * Upserts a lake table snapshot for the given table, stored in v1 format.
Note: this method is
+ * just for back compatibility.
*
* @param tableId the table ID
- * @param tablePath the table path
* @param lakeTableSnapshot the new snapshot to upsert
* @throws Exception if the operation fails
*/
- public void upsertLakeTable(
- long tableId, TablePath tablePath, LakeTableSnapshot
lakeTableSnapshot)
+ public void upsertLakeTable(long tableId, LakeTableSnapshot
lakeTableSnapshot)
Review Comment:
`registerLakeTableSnapshotV1()` or `registerLakeTableSnapshotLegacy()` to
make it clear it is a legacy operation.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java:
##########
@@ -694,6 +708,47 @@ public CompletableFuture<ControlledShutdownResponse>
controlledShutdown(
return response;
}
+ @Override
+ public CompletableFuture<PrepareCommitLakeTableSnapshotResponse>
prepareCommitLakeTableSnapshot(
Review Comment:
move this method before `commitLakeTableSnapshot`.
##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -947,11 +949,53 @@ message PbModifyColumn {
optional int32 column_position_type = 4; // LAST=0,FIRST=1,AFTER=3
}
-
-
message PbDescribeConfig {
required string config_key = 1;
optional string config_value = 2;
required string config_source = 3;
}
+message PbLakeTableSnapshotMetadata {
+ required int64 table_id = 1;
+ required int64 snapshot_id = 2;
+ required string tiered_bucket_offsets_file_path = 3;
+ optional string readable_bucket_offsets_file_path = 4;
+}
+
+message PbLakeTableSnapshotInfo {
+ optional int64 table_id = 1;
+ required int64 snapshot_id = 2;
+ repeated PbLakeTableOffsetForBucket buckets_req = 3;
+ // add table path to reduce get table_path by table id
+ optional PbTablePath table_path = 4;
+}
+
+message PbLakeTableOffsetForBucket {
+ optional int64 partition_id = 1;
+ required int32 bucket_id = 2;
+ // Deprecated: log_start_offset is no longer used. Field number 3 is
reserved for protocol compatibility.
+ // optional int64 log_start_offset = 3;
+ optional int64 log_end_offset = 4;
+ // Deprecated: partition_name is no longer used. Field number 5 is reserved
for protocol compatibility.
+ // optional string partition_name = 5;
+ optional int64 max_timestamp = 6;
+}
+
+message PbPrepareCommitLakeTableRespForTable {
+ optional string lake_table_bucket_offsets_path = 1;
+ optional int32 error_code = 2;
+ optional string error_message = 3;
Review Comment:
Let's follow a standard that `error_code` and `error_message` as the first 2
fields, because we will add more fields at the end.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java:
##########
@@ -44,99 +46,96 @@ public LakeTableHelper(ZooKeeperClient zkClient, String
remoteDataDir) {
}
/**
- * Upserts a lake table snapshot for the given table.
- *
- * <p>This method merges the new snapshot with the existing one (if any)
and stores it (data in
- * remote file, the remote file path in ZK).
+ * Upserts a lake table snapshot for the given table, stored in v1 format.
Note: this method is
+ * just for back compatibility.
*
* @param tableId the table ID
- * @param tablePath the table path
* @param lakeTableSnapshot the new snapshot to upsert
* @throws Exception if the operation fails
*/
- public void upsertLakeTable(
- long tableId, TablePath tablePath, LakeTableSnapshot
lakeTableSnapshot)
+ public void upsertLakeTable(long tableId, LakeTableSnapshot
lakeTableSnapshot)
throws Exception {
Optional<LakeTable> optPreviousLakeTable =
zkClient.getLakeTable(tableId);
// Merge with previous snapshot if exists
if (optPreviousLakeTable.isPresent()) {
- lakeTableSnapshot =
- mergeLakeTable(
-
optPreviousLakeTable.get().getLatestTableSnapshot(), lakeTableSnapshot);
+ TableBucketOffsets tableBucketOffsets =
+ mergeTableBucketOffsets(
+ optPreviousLakeTable.get(),
+ new TableBucketOffsets(
+ tableId,
lakeTableSnapshot.getBucketLogEndOffset()));
+ lakeTableSnapshot = new LakeTableSnapshot(tableId,
tableBucketOffsets.getOffsets());
}
+ zkClient.upsertLakeTable(
+ tableId, new LakeTable(lakeTableSnapshot),
optPreviousLakeTable.isPresent());
+ }
- // store the lake table snapshot into a file
- FsPath lakeTableSnapshotFsPath =
- storeLakeTableSnapshot(tableId, tablePath, lakeTableSnapshot);
-
- LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata =
- new LakeTable.LakeSnapshotMetadata(
- lakeTableSnapshot.getSnapshotId(),
- // use the lake table snapshot file as the tiered
offsets file since
- // the table snapshot file will contain the tiered log
end offsets
- lakeTableSnapshotFsPath,
- // currently, readableOffsetsFilePath is always same
with
- // tieredOffsetsFilePath, but in the future we'll
commit a readable offsets
- // separately to mark what the readable offsets are
for a snapshot since
- // in paimon dv table, tiered log end offsets is not
same with readable
- // offsets
- lakeTableSnapshotFsPath);
-
- // currently, we keep only one lake snapshot metadata in zk,
- // todo: in solve paimon dv union read issue #2121, we'll keep
multiple lake snapshot
- // metadata
+ public void addLakeTableSnapshotMetadata(
Review Comment:
Currently, the method names in `LakeTableHelper` are quite ambiguous. It’s
hard to tell from the name alone what each method actually does (e.g., whether
it interacts with ZooKeeper, reads/writes to OSS, or performs only local
operations).
To improve clarity and maintainability, I suggest renaming them to be more
descriptive of their behavior. For example, rename this method to
`registerLakeTableSnapshotV2` or `registerLakeTableSnapshot()`. As `register`
is a more common verb to reflec it is a `zkClient` operation.
##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -449,25 +449,27 @@ message NotifyRemoteLogOffsetsRequest {
message NotifyRemoteLogOffsetsResponse {
}
-message CommitLakeTableSnapshotRequest {
- repeated PbLakeTableSnapshotInfo tables_req = 1;
+message PrepareCommitLakeTableSnapshotRequest {
+ repeated PbTableBucketOffsets bucket_offsets = 1;
}
-message PbLakeTableSnapshotInfo {
- optional int64 table_id = 1;
- required int64 snapshot_id = 2;
- repeated PbLakeTableOffsetForBucket buckets_req = 3;
+message PrepareCommitLakeTableSnapshotResponse {
+ repeated PbPrepareCommitLakeTableRespForTable prepare_commit_lake_table_resp
= 1;
Review Comment:
ditto
##########
fluss-server/src/test/java/org/apache/fluss/server/zk/data/LakeTableSnapshotJsonSerdeTest.java:
##########
@@ -41,104 +34,37 @@ class LakeTableSnapshotJsonSerdeTest extends
JsonSerdeTestBase<LakeTableSnapshot
@Override
protected LakeTableSnapshot[] createObjects() {
- // Test case 1: Empty snapshot
- LakeTableSnapshot lakeTableSnapshot1 = new LakeTableSnapshot(1L,
Collections.emptyMap());
- // Test case 2: Non-partition table with consecutive bucket ids (0, 1,
2)
long tableId = 4;
+
Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
- bucketLogEndOffset.put(new TableBucket(tableId, 0), 100L);
- bucketLogEndOffset.put(new TableBucket(tableId, 1), 200L);
- bucketLogEndOffset.put(new TableBucket(tableId, 2), 300L);
- LakeTableSnapshot lakeTableSnapshot2 = new LakeTableSnapshot(2,
bucketLogEndOffset);
+ bucketLogEndOffset.put(new TableBucket(tableId, 1), 3L);
+ bucketLogEndOffset.put(new TableBucket(tableId, 2), 4L);
+
+ LakeTableSnapshot lakeTableSnapshot1 = new LakeTableSnapshot(2,
bucketLogEndOffset);
- // Test case 3: Non-partition table with missing bucket ids (0, 2, 4 -
missing 1 and 3)
tableId = 5;
- bucketLogEndOffset = new HashMap<>();
- bucketLogEndOffset.put(new TableBucket(tableId, 0), 100L);
- bucketLogEndOffset.put(new TableBucket(tableId, 2), 300L);
- bucketLogEndOffset.put(new TableBucket(tableId, 4), 500L);
- LakeTableSnapshot lakeTableSnapshot3 = new LakeTableSnapshot(3,
bucketLogEndOffset);
- // Test case 4: Partition table with consecutive bucket ids
- tableId = 6;
bucketLogEndOffset = new HashMap<>();
- bucketLogEndOffset.put(new TableBucket(tableId, 1L, 0), 100L);
- bucketLogEndOffset.put(new TableBucket(tableId, 1L, 1), 200L);
- bucketLogEndOffset.put(new TableBucket(tableId, 2L, 0), 300L);
- bucketLogEndOffset.put(new TableBucket(tableId, 2L, 1), 400L);
- LakeTableSnapshot lakeTableSnapshot4 = new LakeTableSnapshot(4,
bucketLogEndOffset);
+ bucketLogEndOffset.put(new TableBucket(tableId, 1L, 1), 3L);
+ bucketLogEndOffset.put(new TableBucket(tableId, 2L, 1), 4L);
- // Test case 5: Partition table with missing bucket ids
- tableId = 7;
- bucketLogEndOffset = new HashMap<>();
- bucketLogEndOffset.put(new TableBucket(tableId, 1L, 0), 100L);
- bucketLogEndOffset.put(new TableBucket(tableId, 1L, 2), 300L); //
missing bucket 1
- bucketLogEndOffset.put(new TableBucket(tableId, 2L, 1), 400L);
- bucketLogEndOffset.put(new TableBucket(tableId, 2L, 3), 600L); //
missing bucket 0 and 2
- LakeTableSnapshot lakeTableSnapshot5 = new LakeTableSnapshot(5,
bucketLogEndOffset);
+ LakeTableSnapshot lakeTableSnapshot2 = new LakeTableSnapshot(3,
bucketLogEndOffset);
return new LakeTableSnapshot[] {
- lakeTableSnapshot1,
- lakeTableSnapshot2,
- lakeTableSnapshot3,
- lakeTableSnapshot4,
- lakeTableSnapshot5,
+ lakeTableSnapshot1, lakeTableSnapshot2,
};
}
@Override
protected String[] expectedJsons() {
- // Version 2 format (uses different property keys):
- // - Non-partition table: "bucket_offsets": [100, 200, 300], array
index = bucket id,
- // value = log_end_offset. Missing buckets are filled with -1.
- // - Partition table: "partition_bucket_offsets": {"1": [100, 200],
"2": [300, 400]},
- // key = partition id, array index = bucket id, value =
log_end_offset. Missing buckets
- // are filled with -1.
return new String[] {
- // Test case 1: Empty snapshot
- "{\"version\":2,\"snapshot_id\":1}",
- // Test case 2: Non-partition table with consecutive bucket ids
[0, 1, 2]
-
"{\"version\":2,\"snapshot_id\":2,\"table_id\":4,\"bucket_offsets\":[100,200,300]}",
- // Test case 3: Non-partition table with missing bucket ids [0,
-1, 2, -1, 4]
-
"{\"version\":2,\"snapshot_id\":3,\"table_id\":5,\"bucket_offsets\":[100,-1,300,-1,500]}",
- // Test case 4: Partition table with consecutive bucket ids
- "{\"version\":2,\"snapshot_id\":4,\"table_id\":6,"
- +
"\"partition_bucket_offsets\":{\"1\":[100,200],\"2\":[300,400]}}",
- // Test case 5: Partition table with missing bucket ids
- "{\"version\":2,\"snapshot_id\":5,\"table_id\":7,"
- +
"\"partition_bucket_offsets\":{\"1\":[100,-1,300],\"2\":[-1,400,-1,600]}}"
+ "{\"version\":1,\"snapshot_id\":2,\"table_id\":4,"
+ + "\"buckets\":[{\"bucket_id\":2,\"log_end_offset\":4},"
+ + "{\"bucket_id\":1,\"log_end_offset\":3}]}",
+ "{\"version\":1,\"snapshot_id\":3,\"table_id\":5,"
+ +
"\"buckets\":[{\"partition_id\":1,\"bucket_id\":1,\"log_end_offset\":3},"
+ +
"{\"partition_id\":2,\"bucket_id\":1,\"log_end_offset\":4}]}"
};
}
-
- @Test
- void testBackwardCompatibility() {
Review Comment:
We still need this for backward compatibility for the `max_timestamp` and
`log_start_offset` fields.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]