Copilot commented on code in PR #2223:
URL: https://github.com/apache/fluss/pull/2223#discussion_r2639835707
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java:
##########
@@ -323,33 +285,45 @@ private void checkFlussNotMissingLakeSnapshot(
// known lake snapshot, which means the data already has been
committed to lake,
// not to commit to lake to avoid data duplicated
if (missingCommittedSnapshot != null) {
- if (missingCommittedSnapshot.getSnapshotProperties() == null
- || missingCommittedSnapshot
- .getSnapshotProperties()
-
.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY)
- == null) {
+ String lakeSnapshotOffsetPath =
+ missingCommittedSnapshot
+ .getSnapshotProperties()
+ .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
+
+ // should only will happen in v0.7 which won't put offsets info
+ // to properties
+ if (lakeSnapshotOffsetPath == null) {
throw new IllegalStateException(
String.format(
- "Missing required log offsets property '%s' in
lake snapshot %d for table: ‘tablePath=%s, tableId=%d’. "
- + "This property is required to commit
the missing snapshot to Fluss. "
- + "The snapshot may have been created
by an older version of Fluss that did not store this information, "
- + "or the snapshot properties may be
corrupted.",
+ "Can't find %s field from snapshot property.",
+ FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY));
+ }
+
+ // the fluss-offsets will be a json string if it's tiered be v0.8,
+ // since the code path should happen rarely, not to consider back
compatibility
+ // throw IllegalStateException directly
+ String trimmedPath = lakeSnapshotOffsetPath.trim();
+ if (trimmedPath.startsWith("{")) {
+ throw new IllegalStateException(
+ String.format(
+ "The %s field in snapshot property is a JSON
string (tiered by v0.8), "
+ + "which is not supported to restore.
Snapshot ID: %d, Table: {tablePath=%s, tableId=%d}.",
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
missingCommittedSnapshot.getLakeSnapshotId(),
tablePath,
tableId));
}
Review Comment:
The check for JSON string format at line 306 is fragile. It only checks if
the string starts with an opening brace, which could incorrectly identify
legitimate file paths that happen to start with that character. Consider using
a more robust detection method, such as checking for common path prefixes or
patterns, or attempting to parse it as JSON and catching the exception.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:
##########
@@ -1224,6 +1225,48 @@ private <T> void
processAccessContext(AccessContextEvent<T> event) {
private void tryProcessCommitLakeTableSnapshot(
CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent,
CompletableFuture<CommitLakeTableSnapshotResponse> callback) {
+ CommitLakeTableSnapshotData commitLakeTableSnapshotData =
+ commitLakeTableSnapshotEvent.getCommitLakeTableSnapshotData();
+ if
(commitLakeTableSnapshotData.getLakeTableSnapshotMetadatas().isEmpty()) {
+ handleCommitLakeTableSnapshotV1(commitLakeTableSnapshotEvent,
callback);
+ } else {
+ Map<Long, LakeTable.LakeSnapshotMetadata> lakeSnapshotMetadatas =
+
commitLakeTableSnapshotData.getLakeTableSnapshotMetadatas();
+ ioExecutor.execute(
+ () -> {
+ try {
+ CommitLakeTableSnapshotResponse response =
+ new CommitLakeTableSnapshotResponse();
+ for (Map.Entry<Long,
LakeTable.LakeSnapshotMetadata>
+ lakeSnapshotMetadataEntry :
lakeSnapshotMetadatas.entrySet()) {
+ PbCommitLakeTableSnapshotRespForTable
tableResp =
+ response.addTableResp();
+ long tableId =
lakeSnapshotMetadataEntry.getKey();
+ tableResp.setTableId(tableId);
+ try {
+
lakeTableHelper.addLakeTableSnapshotMetadata(
+ tableId,
lakeSnapshotMetadataEntry.getValue());
+ } catch (Exception e) {
+ ApiError error = ApiError.fromThrowable(e);
+ tableResp.setError(error.error().code(),
error.message());
+ }
+ coordinatorEventManager.put(
+ new NotifyLakeTableOffsetEvent(
+
commitLakeTableSnapshotData.getLakeTableSnapshot(),
+ commitLakeTableSnapshotData
+
.getTableBucketsMaxTieredTimestamp()));
+ }
Review Comment:
The NotifyLakeTableOffsetEvent should be sent once after processing all
tables, not inside the for loop for each table. This will cause multiple
redundant events to be sent to the coordinator event manager, each containing
the same data for all tables. Move this call outside the for loop, after line
1258.
```suggestion
}
coordinatorEventManager.put(
new NotifyLakeTableOffsetEvent(
commitLakeTableSnapshotData.getLakeTableSnapshot(),
commitLakeTableSnapshotData
.getTableBucketsMaxTieredTimestamp()));
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java:
##########
@@ -59,49 +70,149 @@ public void open() {
metadataUpdater::getCoordinatorServer, rpcClient,
CoordinatorGateway.class);
}
- void commit(FlussTableLakeSnapshot flussTableLakeSnapshot) throws
IOException {
+ String prepareCommit(long tableId, TablePath tablePath, Map<TableBucket,
Long> logEndOffsets)
+ throws IOException {
+ PbPrepareCommitLakeTableRespForTable prepareCommitResp = null;
+ Exception exception = null;
try {
- CommitLakeTableSnapshotRequest request =
- toCommitLakeTableSnapshotRequest(flussTableLakeSnapshot);
- coordinatorGateway.commitLakeTableSnapshot(request).get();
+ PrepareCommitLakeTableSnapshotRequest
prepareCommitLakeTableSnapshotRequest =
+ toPrepareCommitLakeTableSnapshotRequest(tableId,
tablePath, logEndOffsets);
+ PrepareCommitLakeTableSnapshotResponse
prepareCommitLakeTableSnapshotResponse =
+ coordinatorGateway
+
.prepareCommitLakeTableSnapshot(prepareCommitLakeTableSnapshotRequest)
+ .get();
+ List<PbPrepareCommitLakeTableRespForTable>
pbPrepareCommitLakeTableRespForTables =
+
prepareCommitLakeTableSnapshotResponse.getPrepareCommitLakeTableRespsList();
+ checkState(pbPrepareCommitLakeTableRespForTables.size() == 1);
+ prepareCommitResp = pbPrepareCommitLakeTableRespForTables.get(0);
+ if (prepareCommitResp.hasErrorCode()) {
+ exception =
ApiError.fromErrorMessage(prepareCommitResp).exception();
+ }
} catch (Exception e) {
+ exception = e;
+ }
+
+ if (exception != null) {
throw new IOException(
String.format(
- "Fail to commit table lake snapshot %s to Fluss.",
- flussTableLakeSnapshot),
- ExceptionUtils.stripExecutionException(e));
+ "Fail to prepare commit table lake snapshot for %s
to Fluss.",
+ tablePath),
+ ExceptionUtils.stripExecutionException(exception));
}
+ return checkNotNull(prepareCommitResp).getLakeTableSnapshotFilePath();
}
- public void commit(long tableId, long snapshotId, Map<TableBucket, Long>
logEndOffsets)
+ void commit(
+ long tableId,
+ long lakeSnapshotId,
+ String lakeSnapshotPath,
+ Map<TableBucket, Long> logEndOffsets,
+ Map<TableBucket, Long> logMaxTieredTimestamps)
throws IOException {
- // construct lake snapshot to commit to Fluss
- FlussTableLakeSnapshot flussTableLakeSnapshot =
- new FlussTableLakeSnapshot(tableId, snapshotId);
- for (Map.Entry<TableBucket, Long> entry : logEndOffsets.entrySet()) {
- flussTableLakeSnapshot.addBucketOffset(entry.getKey(),
entry.getValue());
+ Exception exception = null;
+ try {
+ CommitLakeTableSnapshotRequest request =
+ toCommitLakeTableSnapshotRequest(
+ tableId,
+ lakeSnapshotId,
+ lakeSnapshotPath,
+ logEndOffsets,
+ logMaxTieredTimestamps);
+ List<PbCommitLakeTableSnapshotRespForTable>
commitLakeTableSnapshotRespForTables =
+
coordinatorGateway.commitLakeTableSnapshot(request).get().getTableRespsList();
+ checkState(commitLakeTableSnapshotRespForTables.size() == 1);
+ PbCommitLakeTableSnapshotRespForTable commitLakeTableSnapshotRes =
+ commitLakeTableSnapshotRespForTables.get(0);
+ if (commitLakeTableSnapshotRes.hasErrorCode()) {
+ exception =
ApiError.fromErrorMessage(commitLakeTableSnapshotRes).exception();
+ }
+ } catch (Exception e) {
+ exception = e;
+ }
+
+ if (exception != null) {
+ throw new IOException(
+ String.format(
+ "Fail to commit table lake snapshot id %d of table
%d to Fluss.",
+ lakeSnapshotId, tableId),
+ ExceptionUtils.stripExecutionException(exception));
}
- commit(flussTableLakeSnapshot);
}
- private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
- FlussTableLakeSnapshot flussTableLakeSnapshot) {
- CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
- new CommitLakeTableSnapshotRequest();
+ private PrepareCommitLakeTableSnapshotRequest
toPrepareCommitLakeTableSnapshotRequest(
+ long tableId, TablePath tablePath, Map<TableBucket, Long>
logEndOffsets) {
+ PrepareCommitLakeTableSnapshotRequest
prepareCommitLakeTableSnapshotRequest =
+ new PrepareCommitLakeTableSnapshotRequest();
PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo =
- commitLakeTableSnapshotRequest.addTablesReq();
+ prepareCommitLakeTableSnapshotRequest.addTablesReq();
+ pbLakeTableSnapshotInfo.setTableId(tableId);
- pbLakeTableSnapshotInfo.setTableId(flussTableLakeSnapshot.tableId());
-
pbLakeTableSnapshotInfo.setSnapshotId(flussTableLakeSnapshot.lakeSnapshotId());
- for (TableBucket tableBucket : flussTableLakeSnapshot.tableBuckets()) {
+ // in prepare phase, we don't know the snapshot id,
+ // set -1 since the field is required
+ pbLakeTableSnapshotInfo.setSnapshotId(-1L);
+ for (Map.Entry<TableBucket, Long> logEndOffsetEntry :
logEndOffsets.entrySet()) {
PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
pbLakeTableSnapshotInfo.addBucketsReq();
- long endOffset =
flussTableLakeSnapshot.getLogEndOffset(tableBucket);
+ TableBucket tableBucket = logEndOffsetEntry.getKey();
+ pbLakeTableSnapshotInfo
+ .setTablePath()
+ .setDatabaseName(tablePath.getDatabaseName())
+ .setTableName(tablePath.getTableName());
Review Comment:
The table path is set repeatedly inside the loop for every bucket, but it
should only be set once per table since all buckets belong to the same table.
Move lines 157-160 outside the for loop to avoid redundant operations and
improve performance.
##########
fluss-common/src/main/java/org/apache/fluss/lake/committer/CommittedLakeSnapshot.java:
##########
@@ -27,7 +27,6 @@
public class CommittedLakeSnapshot {
private final long lakeSnapshotId;
-
private final Map<String, String> snapshotProperties;
Review Comment:
Removed blank line between fields. While this is minor, it's inconsistent
with the style in the rest of the class where there was a blank line. Consider
keeping the blank line for consistency.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java:
##########
@@ -323,33 +285,45 @@ private void checkFlussNotMissingLakeSnapshot(
// known lake snapshot, which means the data already has been
committed to lake,
// not to commit to lake to avoid data duplicated
if (missingCommittedSnapshot != null) {
- if (missingCommittedSnapshot.getSnapshotProperties() == null
- || missingCommittedSnapshot
- .getSnapshotProperties()
-
.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY)
- == null) {
+ String lakeSnapshotOffsetPath =
+ missingCommittedSnapshot
+ .getSnapshotProperties()
+ .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
+
+ // should only will happen in v0.7 which won't put offsets info
+ // to properties
+ if (lakeSnapshotOffsetPath == null) {
throw new IllegalStateException(
String.format(
- "Missing required log offsets property '%s' in
lake snapshot %d for table: ‘tablePath=%s, tableId=%d’. "
- + "This property is required to commit
the missing snapshot to Fluss. "
- + "The snapshot may have been created
by an older version of Fluss that did not store this information, "
- + "or the snapshot properties may be
corrupted.",
+ "Can't find %s field from snapshot property.",
+ FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY));
+ }
+
+ // the fluss-offsets will be a json string if it's tiered be v0.8,
+ // since the code path should happen rarely, not to consider back
compatibility
+ // throw IllegalStateException directly
Review Comment:
Comment contains a typo: 'will be a json string if it's tiered be v0.8'
should be 'will be a json string if it's tiered by v0.8'.
```suggestion
// the fluss-offsets will be a json string if it's tiered by
v0.8,
// since this code path should be rare, we do not consider
backward compatibility
// and throw IllegalStateException directly
```
##########
fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java:
##########
@@ -372,6 +374,12 @@ public CompletableFuture<DescribeClusterConfigsResponse>
describeClusterConfigs(
throw new UnsupportedOperationException();
}
+ @Override
+ public CompletableFuture<PrepareCommitLakeTableSnapshotResponse>
prepareCommitLakeTableSnapshot(
+ PrepareCommitLakeTableSnapshotRequest request) {
+ return null;
Review Comment:
The method returns null instead of throwing UnsupportedOperationException
like other unimplemented methods in this test class (see lines 374). For
consistency and to avoid potential NullPointerException in tests, consider
throwing UnsupportedOperationException instead.
```suggestion
throw new UnsupportedOperationException();
```
##########
fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java:
##########
@@ -333,6 +335,12 @@ public CompletableFuture<DescribeClusterConfigsResponse>
describeClusterConfigs(
throw new UnsupportedOperationException();
}
+ @Override
+ public CompletableFuture<PrepareCommitLakeTableSnapshotResponse>
prepareCommitLakeTableSnapshot(
+ PrepareCommitLakeTableSnapshotRequest request) {
+ return null;
Review Comment:
The method returns null instead of throwing UnsupportedOperationException
like other unimplemented methods in this test class (see lines 324, 335). For
consistency and to avoid potential NullPointerException in tests, consider
throwing UnsupportedOperationException instead.
```suggestion
throw new UnsupportedOperationException();
```
##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -449,14 +449,48 @@ message NotifyRemoteLogOffsetsRequest {
message NotifyRemoteLogOffsetsResponse {
}
+message PrepareCommitLakeTableSnapshotRequest {
+ repeated PbLakeTableSnapshotInfo tables_req = 1;
+}
+
+message PrepareCommitLakeTableSnapshotResponse {
+ repeated PbPrepareCommitLakeTableRespForTable prepare_commit_lake_table_resp
= 1;
+}
+
+message PbPrepareCommitLakeTableRespForTable {
+ optional string lake_table_snapshot_file_path = 1;
+ optional int32 error_code = 2;
+ optional string error_message = 3;
+}
+
message CommitLakeTableSnapshotRequest {
+ // Deprecated: PbLakeTableSnapshotInfo is no longer used for committing lake
table snapshots.
+ // Currently, it is only used to allow the coordinator to notify tablet
servers about the current
+ // synchronized log end offsets, which are then reported to metrics. In the
future, we plan to
+ // have tiering directly report to metrics, and this field will be fully
removed.
+ // Still reserve it for protocol compatibility.
repeated PbLakeTableSnapshotInfo tables_req = 1;
+ // The metadata for lake table snapshots to be committed. Each entry
contains the table ID,
+ // snapshot ID, and the file paths where the snapshot data (containing
bucket log end offset
+ // information) is stored. The tiered_snapshot_file_path points to the file
storing tiered log
+ // end offsets, while readable_snapshot_file_path (if present) points to the
file storing readable
+ // log end offsets.
+ repeated PbLakeTableSnapshotMetadata lake_table_snapshot_metadata = 2;
+}
+
+message PbLakeTableSnapshotMetadata {
+ required int64 table_id = 1;
+ required int64 snapshot_id = 2;
+ required string tiered_snapshot_file_path = 3;
+ optional string readable_snapshot_file_path = 4;
Review Comment:
The new `tiered_snapshot_file_path` and `readable_snapshot_file_path` fields
are taken directly from client RPC requests and later used to build `FsPath`
instances that are opened and deleted server-side without any validation (see
`ServerRpcMessageUtils.getCommitLakeTableSnapshotData` →
`LakeTable.LakeSnapshotMetadata` →
`LakeTable.getLatestTableSnapshot`/`discard`). This lets a malicious or
compromised client call `commitLakeTableSnapshot` with arbitrary filesystem
paths (e.g., outside the configured remote data directory) and cause the
coordinator to read and delete any file accessible under its filesystem
credentials. To avoid arbitrary file read/delete, derive these paths on the
server from trusted table metadata or, at minimum, enforce strict validation
that confines them to expected directories and schemes before using them.
```suggestion
// The metadata for lake table snapshots to be committed. Each entry
contains the table ID
// and snapshot ID for which snapshot data (containing bucket log end
offset information) is
// stored. Implementations should derive the actual filesystem paths for
snapshot data on the
// server side from trusted table metadata instead of relying on
client-provided paths.
//
// Deprecated: PbLakeTableSnapshotMetadata.tiered_snapshot_file_path and
// PbLakeTableSnapshotMetadata.readable_snapshot_file_path are deprecated
and must not be used
// by new clients; they will be ignored by secure server implementations
and are kept only for
// protocol compatibility.
repeated PbLakeTableSnapshotMetadata lake_table_snapshot_metadata = 2;
}
message PbLakeTableSnapshotMetadata {
required int64 table_id = 1;
required int64 snapshot_id = 2;
// Deprecated: this field previously carried a client-specified filesystem
path. Servers should
// instead derive the tiered snapshot file path from trusted metadata and
ignore this value.
required string tiered_snapshot_file_path = 3 [deprecated = true];
// Deprecated: this field previously carried a client-specified readable
snapshot file path.
// Servers should derive any readable snapshot file path from trusted
metadata and ignore this
// value.
optional string readable_snapshot_file_path = 4 [deprecated = true];
```
##########
fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java:
##########
@@ -249,4 +251,10 @@ public CompletableFuture<DescribeClusterConfigsResponse>
describeClusterConfigs(
DescribeClusterConfigsRequest request) {
return null;
}
+
+ @Override
+ public CompletableFuture<PrepareCommitLakeTableSnapshotResponse>
prepareCommitLakeTableSnapshot(
+ PrepareCommitLakeTableSnapshotRequest request) {
+ return null;
+ }
Review Comment:
The method returns null instead of throwing UnsupportedOperationException
like other unimplemented methods in this test class (see lines 251). For
consistency and to avoid potential NullPointerException in tests, consider
throwing UnsupportedOperationException instead.
##########
fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java:
##########
@@ -591,4 +606,44 @@ protected MetadataResponse processMetadataRequest(
return buildMetadataResponse(
coordinatorServer, aliveTabletServers, tablesMetadata,
partitionsMetadata);
}
+
+ @Override
+ public CompletableFuture<PrepareCommitLakeTableSnapshotResponse>
prepareCommitLakeTableSnapshot(
+ PrepareCommitLakeTableSnapshotRequest request) {
+ CompletableFuture<PrepareCommitLakeTableSnapshotResponse> future =
+ new CompletableFuture<>();
+ ioExecutor.submit(
+ () -> {
+ PrepareCommitLakeTableSnapshotResponse response =
+ new PrepareCommitLakeTableSnapshotResponse();
+ for (PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo :
+ request.getTablesReqsList()) {
+ PbPrepareCommitLakeTableRespForTable
pbPrepareCommitLakeTableRespForTable =
+ response.addPrepareCommitLakeTableResp();
+ try {
+ // upsert lake table snapshot, need to merge the
snapshot with previous
+ // latest snapshot
+ LakeTableSnapshot lakeTableSnapshot =
+ lakeTableHelper.upsertLakeTableSnapshot(
+
pbLakeTableSnapshotInfo.getTableId(),
+
toLakeSnapshot(pbLakeTableSnapshotInfo));
+ TablePath tablePath =
+
toTablePath(pbLakeTableSnapshotInfo.getTablePath());
+ FsPath fsPath =
+ lakeTableHelper.storeLakeTableSnapshot(
+
pbLakeTableSnapshotInfo.getTableId(),
+ tablePath,
+ lakeTableSnapshot);
+
pbPrepareCommitLakeTableRespForTable.setLakeTableSnapshotFilePath(
+ fsPath.toString());
+ } catch (Exception e) {
+ Errors error = ApiError.fromThrowable(e).error();
+ pbPrepareCommitLakeTableRespForTable.setError(
+ error.code(), error.message());
+ }
+ }
+ future.complete(response);
Review Comment:
The submit operation doesn't handle exceptions that may occur during the
task execution. If the lambda throws an exception before completing the future,
the exception will be swallowed. Consider wrapping the entire task in a
try-catch block that calls future.completeExceptionally(e) on error to ensure
all exceptions are properly propagated.
```suggestion
try {
PrepareCommitLakeTableSnapshotResponse response =
new PrepareCommitLakeTableSnapshotResponse();
for (PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo
:
request.getTablesReqsList()) {
PbPrepareCommitLakeTableRespForTable
pbPrepareCommitLakeTableRespForTable =
response.addPrepareCommitLakeTableResp();
try {
// upsert lake table snapshot, need to merge
the snapshot with previous
// latest snapshot
LakeTableSnapshot lakeTableSnapshot =
lakeTableHelper.upsertLakeTableSnapshot(
pbLakeTableSnapshotInfo.getTableId(),
toLakeSnapshot(pbLakeTableSnapshotInfo));
TablePath tablePath =
toTablePath(pbLakeTableSnapshotInfo.getTablePath());
FsPath fsPath =
lakeTableHelper.storeLakeTableSnapshot(
pbLakeTableSnapshotInfo.getTableId(),
tablePath,
lakeTableSnapshot);
pbPrepareCommitLakeTableRespForTable.setLakeTableSnapshotFilePath(
fsPath.toString());
} catch (Exception e) {
Errors error =
ApiError.fromThrowable(e).error();
pbPrepareCommitLakeTableRespForTable.setError(
error.code(), error.message());
}
}
future.complete(response);
} catch (Throwable t) {
future.completeExceptionally(t);
}
```
##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -449,14 +449,48 @@ message NotifyRemoteLogOffsetsRequest {
message NotifyRemoteLogOffsetsResponse {
}
+message PrepareCommitLakeTableSnapshotRequest {
+ repeated PbLakeTableSnapshotInfo tables_req = 1;
+}
+
+message PrepareCommitLakeTableSnapshotResponse {
+ repeated PbPrepareCommitLakeTableRespForTable prepare_commit_lake_table_resp
= 1;
+}
+
+message PbPrepareCommitLakeTableRespForTable {
+ optional string lake_table_snapshot_file_path = 1;
+ optional int32 error_code = 2;
+ optional string error_message = 3;
+}
Review Comment:
The PbPrepareCommitLakeTableRespForTable message is missing a table_id
field. Without this field, clients cannot determine which table each response
corresponds to when processing a batch request with multiple tables. Add a
required int64 table_id field to properly identify the table for each response.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java:
##########
@@ -59,49 +70,149 @@ public void open() {
metadataUpdater::getCoordinatorServer, rpcClient,
CoordinatorGateway.class);
}
- void commit(FlussTableLakeSnapshot flussTableLakeSnapshot) throws
IOException {
+ String prepareCommit(long tableId, TablePath tablePath, Map<TableBucket,
Long> logEndOffsets)
+ throws IOException {
+ PbPrepareCommitLakeTableRespForTable prepareCommitResp = null;
+ Exception exception = null;
try {
- CommitLakeTableSnapshotRequest request =
- toCommitLakeTableSnapshotRequest(flussTableLakeSnapshot);
- coordinatorGateway.commitLakeTableSnapshot(request).get();
+ PrepareCommitLakeTableSnapshotRequest
prepareCommitLakeTableSnapshotRequest =
+ toPrepareCommitLakeTableSnapshotRequest(tableId,
tablePath, logEndOffsets);
+ PrepareCommitLakeTableSnapshotResponse
prepareCommitLakeTableSnapshotResponse =
+ coordinatorGateway
+
.prepareCommitLakeTableSnapshot(prepareCommitLakeTableSnapshotRequest)
+ .get();
+ List<PbPrepareCommitLakeTableRespForTable>
pbPrepareCommitLakeTableRespForTables =
+
prepareCommitLakeTableSnapshotResponse.getPrepareCommitLakeTableRespsList();
+ checkState(pbPrepareCommitLakeTableRespForTables.size() == 1);
+ prepareCommitResp = pbPrepareCommitLakeTableRespForTables.get(0);
+ if (prepareCommitResp.hasErrorCode()) {
+ exception =
ApiError.fromErrorMessage(prepareCommitResp).exception();
+ }
} catch (Exception e) {
+ exception = e;
+ }
+
+ if (exception != null) {
throw new IOException(
String.format(
- "Fail to commit table lake snapshot %s to Fluss.",
- flussTableLakeSnapshot),
- ExceptionUtils.stripExecutionException(e));
+ "Fail to prepare commit table lake snapshot for %s
to Fluss.",
+ tablePath),
+ ExceptionUtils.stripExecutionException(exception));
}
+ return checkNotNull(prepareCommitResp).getLakeTableSnapshotFilePath();
}
- public void commit(long tableId, long snapshotId, Map<TableBucket, Long>
logEndOffsets)
+ void commit(
+ long tableId,
+ long lakeSnapshotId,
+ String lakeSnapshotPath,
+ Map<TableBucket, Long> logEndOffsets,
+ Map<TableBucket, Long> logMaxTieredTimestamps)
throws IOException {
- // construct lake snapshot to commit to Fluss
- FlussTableLakeSnapshot flussTableLakeSnapshot =
- new FlussTableLakeSnapshot(tableId, snapshotId);
- for (Map.Entry<TableBucket, Long> entry : logEndOffsets.entrySet()) {
- flussTableLakeSnapshot.addBucketOffset(entry.getKey(),
entry.getValue());
+ Exception exception = null;
+ try {
+ CommitLakeTableSnapshotRequest request =
+ toCommitLakeTableSnapshotRequest(
+ tableId,
+ lakeSnapshotId,
+ lakeSnapshotPath,
+ logEndOffsets,
+ logMaxTieredTimestamps);
+ List<PbCommitLakeTableSnapshotRespForTable>
commitLakeTableSnapshotRespForTables =
+
coordinatorGateway.commitLakeTableSnapshot(request).get().getTableRespsList();
+ checkState(commitLakeTableSnapshotRespForTables.size() == 1);
+ PbCommitLakeTableSnapshotRespForTable commitLakeTableSnapshotRes =
+ commitLakeTableSnapshotRespForTables.get(0);
+ if (commitLakeTableSnapshotRes.hasErrorCode()) {
+ exception =
ApiError.fromErrorMessage(commitLakeTableSnapshotRes).exception();
+ }
+ } catch (Exception e) {
+ exception = e;
+ }
+
+ if (exception != null) {
+ throw new IOException(
+ String.format(
+ "Fail to commit table lake snapshot id %d of table
%d to Fluss.",
+ lakeSnapshotId, tableId),
+ ExceptionUtils.stripExecutionException(exception));
}
- commit(flussTableLakeSnapshot);
}
- private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
- FlussTableLakeSnapshot flussTableLakeSnapshot) {
- CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
- new CommitLakeTableSnapshotRequest();
+ private PrepareCommitLakeTableSnapshotRequest
toPrepareCommitLakeTableSnapshotRequest(
+ long tableId, TablePath tablePath, Map<TableBucket, Long>
logEndOffsets) {
+ PrepareCommitLakeTableSnapshotRequest
prepareCommitLakeTableSnapshotRequest =
+ new PrepareCommitLakeTableSnapshotRequest();
PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo =
- commitLakeTableSnapshotRequest.addTablesReq();
+ prepareCommitLakeTableSnapshotRequest.addTablesReq();
+ pbLakeTableSnapshotInfo.setTableId(tableId);
- pbLakeTableSnapshotInfo.setTableId(flussTableLakeSnapshot.tableId());
-
pbLakeTableSnapshotInfo.setSnapshotId(flussTableLakeSnapshot.lakeSnapshotId());
- for (TableBucket tableBucket : flussTableLakeSnapshot.tableBuckets()) {
+ // in prepare phase, we don't know the snapshot id,
+ // set -1 since the field is required
+ pbLakeTableSnapshotInfo.setSnapshotId(-1L);
+ for (Map.Entry<TableBucket, Long> logEndOffsetEntry :
logEndOffsets.entrySet()) {
PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
pbLakeTableSnapshotInfo.addBucketsReq();
- long endOffset =
flussTableLakeSnapshot.getLogEndOffset(tableBucket);
+ TableBucket tableBucket = logEndOffsetEntry.getKey();
+ pbLakeTableSnapshotInfo
+ .setTablePath()
+ .setDatabaseName(tablePath.getDatabaseName())
+ .setTableName(tablePath.getTableName());
if (tableBucket.getPartitionId() != null) {
pbLakeTableOffsetForBucket.setPartitionId(tableBucket.getPartitionId());
}
pbLakeTableOffsetForBucket.setBucketId(tableBucket.getBucket());
- pbLakeTableOffsetForBucket.setLogEndOffset(endOffset);
+
pbLakeTableOffsetForBucket.setLogEndOffset(logEndOffsetEntry.getValue());
+ }
+ return prepareCommitLakeTableSnapshotRequest;
+ }
+
+ private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest(
+ long tableId,
+ long snapshotId,
+ String lakeSnapshotPath,
+ Map<TableBucket, Long> logEndOffsets,
+ Map<TableBucket, Long> logMaxTieredTimestamps) {
+ CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest =
+ new CommitLakeTableSnapshotRequest();
+
+ // Add lake table snapshot metadata
+ PbLakeTableSnapshotMetadata pbLakeTableSnapshotMetadata =
+ commitLakeTableSnapshotRequest.addLakeTableSnapshotMetadata();
+ pbLakeTableSnapshotMetadata.setSnapshotId(snapshotId);
+ pbLakeTableSnapshotMetadata.setTableId(tableId);
+ // tiered snapshot file path is equal to readable snapshot currently
+
pbLakeTableSnapshotMetadata.setTieredSnapshotFilePath(lakeSnapshotPath);
+
pbLakeTableSnapshotMetadata.setReadableSnapshotFilePath(lakeSnapshotPath);
+
+ // Add PbLakeTableSnapshotInfo for metrics reporting (to notify tablet
servers about
+ // synchronized log end offsets and max timestamps)
+ if (!logEndOffsets.isEmpty()) {
+ PbLakeTableSnapshotInfo pbLakeTableSnapshotInfo =
+ commitLakeTableSnapshotRequest.addTablesReq();
+ for (Map.Entry<TableBucket, Long> logEndOffsetEntry :
logEndOffsets.entrySet()) {
+
+ pbLakeTableSnapshotInfo.setTableId(tableId);
+ pbLakeTableSnapshotInfo.setSnapshotId(snapshotId);
Review Comment:
The tableId and snapshotId are set repeatedly inside the loop for every
bucket. Since all buckets in the loop belong to the same table and snapshot,
these fields should be set once before the loop begins, not for each iteration.
Move lines 195-196 outside the for loop.
```suggestion
pbLakeTableSnapshotInfo.setTableId(tableId);
pbLakeTableSnapshotInfo.setSnapshotId(snapshotId);
for (Map.Entry<TableBucket, Long> logEndOffsetEntry :
logEndOffsets.entrySet()) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]