Copilot commented on code in PR #2223: URL: https://github.com/apache/fluss/pull/2223#discussion_r2646367008
########## fluss-common/src/test/java/org/apache/fluss/utils/json/TableBucketOffsetsJsonSerdeTest.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.utils.json; + +import org.apache.fluss.metadata.TableBucket; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** Test for {@link TableBucketOffsetsJsonSerde}. */ +class TableBucketOffsetsJsonSerdeTest extends JsonSerdeTestBase<TableBucketOffsets> { Review Comment: Unused class: TableBucketOffsetsJsonSerdeTest is not referenced within this codebase. If not used as an external API it should be removed. ########## fluss-server/src/main/java/org/apache/fluss/server/entity/CommitLakeTableSnapshotData.java: ########## @@ -27,14 +28,32 @@ /** The data for request {@link CommitLakeTableSnapshotRequest}. */ public class CommitLakeTableSnapshotData { + /** + * Since 0.9, this field is only used to allow the coordinator to send requests to tablet + * servers, enabling tablet servers to report metrics about synchronized log end offsets. In the + * future, we plan to have the tiering service directly report metrics, and this field will be + * removed. + */ Review Comment: The comment on line 34 says "this field will be removed" which contradicts the JavaDoc above that says the field "will be fully removed". Additionally, the comment mentions this is "only used" for notification, but the field is still documented in the constructor parameter comments on lines 31-36. Consider making these comments consistent about when/how this will be deprecated. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java: ########## @@ -1224,6 +1225,48 @@ private <T> void processAccessContext(AccessContextEvent<T> event) { private void tryProcessCommitLakeTableSnapshot( CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent, CompletableFuture<CommitLakeTableSnapshotResponse> callback) { + CommitLakeTableSnapshotData commitLakeTableSnapshotData = + commitLakeTableSnapshotEvent.getCommitLakeTableSnapshotData(); + if (commitLakeTableSnapshotData.getLakeTableSnapshotMetadatas().isEmpty()) { + handleCommitLakeTableSnapshotV1(commitLakeTableSnapshotEvent, callback); + } else { + Map<Long, LakeTable.LakeSnapshotMetadata> lakeSnapshotMetadatas = + commitLakeTableSnapshotData.getLakeTableSnapshotMetadatas(); + ioExecutor.execute( + () -> { + try { + CommitLakeTableSnapshotResponse response = + new CommitLakeTableSnapshotResponse(); + for (Map.Entry<Long, LakeTable.LakeSnapshotMetadata> + lakeSnapshotMetadataEntry : lakeSnapshotMetadatas.entrySet()) { + PbCommitLakeTableSnapshotRespForTable tableResp = + response.addTableResp(); + long tableId = lakeSnapshotMetadataEntry.getKey(); + tableResp.setTableId(tableId); + try { + lakeTableHelper.addLakeTableSnapshotMetadata( + tableId, lakeSnapshotMetadataEntry.getValue()); + } catch (Exception e) { + ApiError error = ApiError.fromThrowable(e); + tableResp.setError(error.error().code(), error.message()); Review Comment: If the coordinator fails to add the lake table snapshot metadata, the file written in the prepare phase is never cleaned up. Consider tracking prepared files and implementing cleanup on failure, or document that cleanup will happen through other mechanisms (e.g., garbage collection). ```suggestion tableResp.setError(error.error().code(), error.message()); // Note: Any cleanup of prepared lake table snapshot files on failure // is expected to be handled by LakeTableHelper or other background // mechanisms (for example, garbage collection of unreferenced files). LOG.warn( "Failed to add lake table snapshot metadata for table {}. " + "Prepared snapshot files may be cleaned up by background " + "cleanup or garbage collection.", tableId, e); ``` ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java: ########## @@ -323,33 +284,45 @@ private void checkFlussNotMissingLakeSnapshot( // known lake snapshot, which means the data already has been committed to lake, // not to commit to lake to avoid data duplicated if (missingCommittedSnapshot != null) { - if (missingCommittedSnapshot.getSnapshotProperties() == null - || missingCommittedSnapshot - .getSnapshotProperties() - .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY) - == null) { + String lakeSnapshotOffsetPath = + missingCommittedSnapshot + .getSnapshotProperties() + .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY); + + // should only will happen in v0.7 which won't put offsets info + // to properties + if (lakeSnapshotOffsetPath == null) { throw new IllegalStateException( String.format( - "Missing required log offsets property '%s' in lake snapshot %d for table: ‘tablePath=%s, tableId=%d’. " - + "This property is required to commit the missing snapshot to Fluss. " - + "The snapshot may have been created by an older version of Fluss that did not store this information, " - + "or the snapshot properties may be corrupted.", + "Can't find %s field from snapshot property.", + FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY)); + } + + // the fluss-offsets will be a json string if it's tiered by v0.8, + // since this code path should be rare, we do not consider backward compatibility + // and throw IllegalStateException directly + String trimmedPath = lakeSnapshotOffsetPath.trim(); + if (trimmedPath.startsWith("{")) { Review Comment: The trimmedPath.startsWith("{") check is fragile. A legitimate file path could theoretically start with '{'. Consider a more robust check, such as attempting to parse as JSON or checking for a scheme/protocol indicator in the path string. ########## fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java: ########## @@ -591,4 +607,45 @@ protected MetadataResponse processMetadataRequest( return buildMetadataResponse( coordinatorServer, aliveTabletServers, tablesMetadata, partitionsMetadata); } + + @Override + public CompletableFuture<PrepareCommitLakeTableSnapshotResponse> prepareCommitLakeTableSnapshot( + PrepareCommitLakeTableSnapshotRequest request) { + CompletableFuture<PrepareCommitLakeTableSnapshotResponse> future = + new CompletableFuture<>(); + ioExecutor.submit( + () -> { + PrepareCommitLakeTableSnapshotResponse response = + new PrepareCommitLakeTableSnapshotResponse(); + try { + for (PbTableBucketOffsets bucketOffsets : request.getBucketOffsetsList()) { + PbPrepareCommitLakeTableRespForTable + pbPrepareCommitLakeTableRespForTable = + response.addPrepareCommitLakeTableResp(); + try { + // upsert lake table snapshot, need to merge the snapshot with + // previous latest snapshot + TableBucketOffsets tableBucketOffsets = + lakeTableHelper.upsertTableBucketOffsets( + bucketOffsets.getTableId(), + toTableBucketOffsets(bucketOffsets)); + TablePath tablePath = toTablePath(bucketOffsets.getTablePath()); + FsPath fsPath = + lakeTableHelper.storeLakeTableBucketOffsets( + tablePath, tableBucketOffsets); + pbPrepareCommitLakeTableRespForTable.setLakeTableBucketOffsetsPath( + fsPath.toString()); + } catch (Exception e) { + Errors error = ApiError.fromThrowable(e).error(); + pbPrepareCommitLakeTableRespForTable.setError( + error.code(), error.message()); + } Review Comment: The error handling here only sets an error message for individual table failures, but if an exception occurs at line 645, the entire response will fail. This means a failure in one table could prevent other tables from being processed. Consider catching exceptions inside the loop to allow partial success. ########## fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java: ########## @@ -283,18 +285,18 @@ void testTableCommitWhenFlussMissingLakeSnapshot() throws Exception { verifyLakeSnapshot( tablePath, tableId, - 2, - getExpectedLogEndOffsets(tableId, mockCommittedSnapshot), + 0, + expectedLogEndOffsets, String.format( "The current Fluss's lake snapshot %s is less than lake actual snapshot %d committed by Fluss for table: {tablePath=%s, tableId=%d}," + " missing snapshot: %s.", null, - mockCommittedSnapshot.getLakeSnapshotId(), + mockMissingCommittedLakeSnapshot.getLakeSnapshotId(), tablePath, tableId, - mockCommittedSnapshot)); + mockMissingCommittedLakeSnapshot)); Review Comment: The variable is named 'mockMissingCommittedLakeSnapshot' but the comment says 'mockCommittedLakeSnapshot'. For clarity and consistency with the new name, update the comment to reference the correct variable name 'mockMissingCommittedLakeSnapshot'. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
