Copilot commented on code in PR #2223:
URL: https://github.com/apache/fluss/pull/2223#discussion_r2645628123
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java:
##########
@@ -59,53 +85,196 @@ public void open() {
metadataUpdater::getCoordinatorServer, rpcClient,
CoordinatorGateway.class);
}
- void commit(FlussTableLakeSnapshot flussTableLakeSnapshot) throws
IOException {
+ String prepareCommit(long tableId, TablePath tablePath, Map<TableBucket,
Long> logEndOffsets)
+ throws IOException {
+ PbPrepareCommitLakeTableRespForTable prepareCommitResp = null;
+ Exception exception = null;
try {
- CommitLakeTableSnapshotRequest request =
- toCommitLakeTableSnapshotRequest(flussTableLakeSnapshot);
- coordinatorGateway.commitLakeTableSnapshot(request).get();
+ PrepareCommitLakeTableSnapshotRequest
prepareCommitLakeTableSnapshotRequest =
+ toPrepareCommitLakeTableSnapshotRequest(tableId,
tablePath, logEndOffsets);
+ PrepareCommitLakeTableSnapshotResponse
prepareCommitLakeTableSnapshotResponse =
+ coordinatorGateway
+
.prepareCommitLakeTableSnapshot(prepareCommitLakeTableSnapshotRequest)
+ .get();
+ List<PbPrepareCommitLakeTableRespForTable>
pbPrepareCommitLakeTableRespForTables =
+
prepareCommitLakeTableSnapshotResponse.getPrepareCommitLakeTableRespsList();
+ checkState(pbPrepareCommitLakeTableRespForTables.size() == 1);
+ prepareCommitResp = pbPrepareCommitLakeTableRespForTables.get(0);
+ if (prepareCommitResp.hasErrorCode()) {
+ exception =
ApiError.fromErrorMessage(prepareCommitResp).exception();
+ }
} catch (Exception e) {
+ exception = e;
+ }
+
+ if (exception != null) {
throw new IOException(
String.format(
- "Fail to commit table lake snapshot %s to Fluss.",
- flussTableLakeSnapshot),
- ExceptionUtils.stripExecutionException(e));
+ "Fail to prepare commit table lake snapshot for %s
to Fluss.",
+ tablePath),
+ ExceptionUtils.stripExecutionException(exception));
}
+ return checkNotNull(prepareCommitResp).getLakeTableSnapshotFilePath();
Review Comment:
If the prepare phase succeeds but stores a response with an error, the
method at line 117 will throw a NullPointerException when calling
`getLakeTableSnapshotFilePath()` on a prepareCommitResp that doesn't have this
field set (only error fields are set). The error should be thrown before
attempting to access the file path, or the access should be guarded.
```suggestion
if (prepareCommitResp == null) {
throw new IOException(
String.format(
"Prepare commit response is null for table %s;
no lake snapshot path returned.",
tablePath));
}
String lakeTableSnapshotFilePath =
prepareCommitResp.getLakeTableSnapshotFilePath();
if (lakeTableSnapshotFilePath == null) {
throw new IOException(
String.format(
"Prepare commit response for table %s does not
contain a lake table snapshot file path.",
tablePath));
}
return lakeTableSnapshotFilePath;
```
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java:
##########
@@ -119,24 +176,28 @@ private LakeTableSnapshot mergeLakeTable(
return new LakeTableSnapshot(newLakeTableSnapshot.getSnapshotId(),
bucketLogEndOffset);
}
- private FsPath storeLakeTableSnapshot(
+ public FsPath storeLakeTableSnapshot(
long tableId, TablePath tablePath, LakeTableSnapshot
lakeTableSnapshot)
throws Exception {
- // get the remote file path to store the lake table snapshot
information
- FsPath remoteLakeTableSnapshotManifestPath =
- FlussPaths.remoteLakeTableSnapshotManifestPath(remoteDataDir,
tablePath, tableId);
+ // get the remote file path to store the lake table snapshot offset
information
+ FsPath remoteLakeTableSnapshotOffsetPath =
+ FlussPaths.remoteLakeTableSnapshotOffsetPath(remoteDataDir,
tablePath, tableId);
// check whether the parent directory exists, if not, create the
directory
- FileSystem fileSystem =
remoteLakeTableSnapshotManifestPath.getFileSystem();
- if
(!fileSystem.exists(remoteLakeTableSnapshotManifestPath.getParent())) {
- fileSystem.mkdirs(remoteLakeTableSnapshotManifestPath.getParent());
+ FileSystem fileSystem =
remoteLakeTableSnapshotOffsetPath.getFileSystem();
+ if (!fileSystem.exists(remoteLakeTableSnapshotOffsetPath.getParent()))
{
+ fileSystem.mkdirs(remoteLakeTableSnapshotOffsetPath.getParent());
}
- // serialize table snapshot to json bytes, and write to file
- byte[] jsonBytes =
LakeTableSnapshotJsonSerde.toJson(lakeTableSnapshot);
+ // serialize table offsets to json bytes, and write to file
+ byte[] jsonBytes =
+ JsonSerdeUtils.writeValueAsBytes(
+ new TableBucketOffsets(tableId,
lakeTableSnapshot.getBucketLogEndOffset()),
+ TableBucketOffsetsJsonSerde.INSTANCE);
+
try (FSDataOutputStream outputStream =
fileSystem.create(
- remoteLakeTableSnapshotManifestPath,
FileSystem.WriteMode.OVERWRITE)) {
+ remoteLakeTableSnapshotOffsetPath,
FileSystem.WriteMode.OVERWRITE)) {
outputStream.write(jsonBytes);
}
- return remoteLakeTableSnapshotManifestPath;
+ return remoteLakeTableSnapshotOffsetPath;
}
Review Comment:
The method generates a new random UUID for each snapshot file path, but
there's no cleanup mechanism mentioned for old snapshot files. Over time, these
files will accumulate in the metadata directory, potentially causing storage
issues. Consider implementing a cleanup strategy for old snapshot offset files.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableJsonSerde.java:
##########
@@ -61,28 +61,35 @@ public class LakeTableJsonSerde implements
JsonSerializer<LakeTable>, JsonDeseri
@Override
public void serialize(LakeTable lakeTable, JsonGenerator generator) throws
IOException {
- generator.writeStartObject();
- generator.writeNumberField(VERSION_KEY, CURRENT_VERSION);
-
- generator.writeArrayFieldStart(LAKE_SNAPSHOTS);
- for (LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata :
- checkNotNull(lakeTable.getLakeSnapshotMetadatas())) {
+ // if lake table snapshot is null, it must be version 1
+ if (lakeTable.getLakeTableSnapshot() != null) {
Review Comment:
The condition check logic is inverted. When
`lakeTable.getLakeTableSnapshot() != null`, it's V1 format (full snapshot data
in ZK), but the comment says "if lake table snapshot is null, it must be
version 1". The comment should be corrected to say "if lake table snapshot is
not null, it must be version 1".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]