luoyuxia commented on code in PR #2223:
URL: https://github.com/apache/fluss/pull/2223#discussion_r2645666268
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java:
##########
@@ -119,24 +176,28 @@ private LakeTableSnapshot mergeLakeTable(
return new LakeTableSnapshot(newLakeTableSnapshot.getSnapshotId(),
bucketLogEndOffset);
}
- private FsPath storeLakeTableSnapshot(
+ public FsPath storeLakeTableSnapshot(
long tableId, TablePath tablePath, LakeTableSnapshot
lakeTableSnapshot)
throws Exception {
- // get the remote file path to store the lake table snapshot
information
- FsPath remoteLakeTableSnapshotManifestPath =
- FlussPaths.remoteLakeTableSnapshotManifestPath(remoteDataDir,
tablePath, tableId);
+ // get the remote file path to store the lake table snapshot offset
information
+ FsPath remoteLakeTableSnapshotOffsetPath =
+ FlussPaths.remoteLakeTableSnapshotOffsetPath(remoteDataDir,
tablePath, tableId);
// check whether the parent directory exists, if not, create the
directory
- FileSystem fileSystem =
remoteLakeTableSnapshotManifestPath.getFileSystem();
- if
(!fileSystem.exists(remoteLakeTableSnapshotManifestPath.getParent())) {
- fileSystem.mkdirs(remoteLakeTableSnapshotManifestPath.getParent());
+ FileSystem fileSystem =
remoteLakeTableSnapshotOffsetPath.getFileSystem();
+ if (!fileSystem.exists(remoteLakeTableSnapshotOffsetPath.getParent()))
{
+ fileSystem.mkdirs(remoteLakeTableSnapshotOffsetPath.getParent());
}
- // serialize table snapshot to json bytes, and write to file
- byte[] jsonBytes =
LakeTableSnapshotJsonSerde.toJson(lakeTableSnapshot);
+ // serialize table offsets to json bytes, and write to file
+ byte[] jsonBytes =
+ JsonSerdeUtils.writeValueAsBytes(
+ new TableBucketOffsets(tableId,
lakeTableSnapshot.getBucketLogEndOffset()),
+ TableBucketOffsetsJsonSerde.INSTANCE);
+
try (FSDataOutputStream outputStream =
fileSystem.create(
- remoteLakeTableSnapshotManifestPath,
FileSystem.WriteMode.OVERWRITE)) {
+ remoteLakeTableSnapshotOffsetPath,
FileSystem.WriteMode.OVERWRITE)) {
outputStream.write(jsonBytes);
}
- return remoteLakeTableSnapshotManifestPath;
+ return remoteLakeTableSnapshotOffsetPath;
}
Review Comment:
LakeUpsertHelper will clean it up
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]