Copilot commented on code in PR #2640:
URL: https://github.com/apache/fluss/pull/2640#discussion_r2791280551
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java:
##########
@@ -159,8 +166,43 @@ public ReadableSnapshotResult
getReadableSnapshotAndOffsets(long tieredSnapshotI
return null;
}
- // todo: optimize in #2626, if the compacted snapshot exists in zk, we
can
- // skip the follow check
+ LakeSnapshot lastCompactedLakeSnapshot = null;
+
+ try {
+ // Attempt to retrieve the snapshot from Fluss.
+ // This is a blocking call to unwrap the future.
+ lastCompactedLakeSnapshot =
+ flussAdmin.getLakeSnapshot(tablePath,
latestCompactedSnapshot.id()).get();
+ } catch (Exception e) {
+ Throwable cause = ExceptionUtils.stripExecutionException(e);
+
+ // If the error is anything other than the snapshot simply not
existing,
+ // we log a warning but do not interrupt the flow.
+ if (!(cause instanceof LakeTableSnapshotNotExistException)) {
+ LOG.warn(
+ "Failed to retrieve lake snapshot {} from Fluss. "
+ + "Will attempt to advance readable snapshot
as a fallback.",
+ latestCompactedSnapshot.id(),
+ cause);
+ }
+ // If LakeTableSnapshotNotExistException occurs, we silently fall
through
+ // as it is an expected case when the snapshot hasn't been
recorded yet.
+ }
Review Comment:
The `catch (Exception e)` around `flussAdmin.getLakeSnapshot(...).get()` can
swallow `InterruptedException` (or a wrapped interruption) without restoring
the thread’s interrupted status. In this codebase, blocking future waits
typically catch `InterruptedException`, call
`Thread.currentThread().interrupt()`, and then fail fast (or propagate). Please
handle `InterruptedException` explicitly (and optionally `ExecutionException`
separately), re-interrupt the thread, and avoid continuing with
potentially-cancelled work.
##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java:
##########
@@ -159,8 +166,43 @@ public ReadableSnapshotResult
getReadableSnapshotAndOffsets(long tieredSnapshotI
return null;
}
- // todo: optimize in #2626, if the compacted snapshot exists in zk, we
can
- // skip the follow check
+ LakeSnapshot lastCompactedLakeSnapshot = null;
+
+ try {
+ // Attempt to retrieve the snapshot from Fluss.
+ // This is a blocking call to unwrap the future.
+ lastCompactedLakeSnapshot =
+ flussAdmin.getLakeSnapshot(tablePath,
latestCompactedSnapshot.id()).get();
+ } catch (Exception e) {
+ Throwable cause = ExceptionUtils.stripExecutionException(e);
+
+ // If the error is anything other than the snapshot simply not
existing,
+ // we log a warning but do not interrupt the flow.
+ if (!(cause instanceof LakeTableSnapshotNotExistException)) {
+ LOG.warn(
+ "Failed to retrieve lake snapshot {} from Fluss. "
+ + "Will attempt to advance readable snapshot
as a fallback.",
+ latestCompactedSnapshot.id(),
+ cause);
+ }
+ // If LakeTableSnapshotNotExistException occurs, we silently fall
through
+ // as it is an expected case when the snapshot hasn't been
recorded yet.
+ }
+
+ // If we successfully retrieved a snapshot, we must validate its
integrity.
+ if (lastCompactedLakeSnapshot != null) {
+ // Consistency Check: The ID in Fluss must strictly match the
expected compacted ID.
+ // If they differ, it indicates a critical state mismatch in the
metadata.
+ checkState(
+ lastCompactedLakeSnapshot.getSnapshotId() ==
latestCompactedSnapshot.id(),
+ "Snapshot ID mismatch detected! Expected: %s, Actual in
Fluss: %s",
+ latestCompactedSnapshot.id(),
+ lastCompactedLakeSnapshot.getSnapshotId());
+
+ // If the snapshot already exists and is valid, no further action
(advancing) is
+ // required.
+ return null;
+ }
Review Comment:
Returning `null` here changes the commit path to
`LakeCommitResult.unknownReadableSnapshot(...)` (see `PaimonLakeCommitter`),
which sets `earliestSnapshotIDToKeep = KEEP_ALL_PREVIOUS (-1)`. That
effectively disables snapshot cleanup for subsequent commits and can cause
unbounded growth of lake snapshot metadata/files. If the intent is to skip
*recomputation* (and/or re-commit) while preserving retention behavior,
consider fetching the already-registered readable snapshot/offsets from Fluss
(e.g., via `getReadableLakeSnapshot` + `getLakeSnapshot` for tiered offsets)
and still returning a `ReadableSnapshotResult`, or introduce a commit result
variant that skips re-committing the readable snapshot but still carries
forward the correct `earliestSnapshotIdToKeep`.
##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetrieverTest.java:
##########
@@ -413,6 +413,22 @@ void testGetReadableSnapshotAndOffsets() throws Exception {
// snapshot6 safely since we won't need to search for any earlier
snapshots to get readable
// offsets
assertThat(readableSnapshotAndOffsets.getEarliestSnapshotIdToKeep()).isEqualTo(6);
+ commitSnapshot(
+ tableId,
+ tablePath,
+ snapshot13,
+ tieredLakeSnapshotEndOffset,
+ retriveReadableSnapshotAndOffsets(tablePath, fileStoreTable,
snapshot13));
+
+ // when the compacted snapshot is already registered in ZK,
+ // getReadableSnapshotAndOffsets skips recomputation and returns null.
+ long snapshot15 = writeAndCommitData(fileStoreTable,
Collections.emptyMap());
+ DvTableReadableSnapshotRetriever.ReadableSnapshotResult result15 =
+ retriveReadableSnapshotAndOffsets(tablePath, fileStoreTable,
snapshot15);
+ assertThat(result15)
+ .as(
+ "Compacted snapshot 13 is already in ZK, should skip
recomputation and return null")
+ .isNull();
Review Comment:
This test commits `snapshot13` by calling `commitSnapshot(..., snapshot13,
..., retriveReadableSnapshotAndOffsets(..., snapshot13))`, but
`getReadableSnapshotAndOffsets(snapshot13)` searches for the previous COMPACT
snapshot (before 13) and likely won’t register snapshot 13 as a *readable* lake
snapshot (i.e., with readable offsets). Since the new optimization only checks
existence via `Admin#getLakeSnapshot` (tiered offsets), the test can pass even
if readable offsets for snapshot 13 were never committed, which doesn’t match
the intended scenario (“latest compacted snapshot already registered”). To make
the test validate the real behavior, register snapshot 13 as readable by
committing the tiered snapshot that produced `readableSnapshotAndOffsets` for
snapshot14 (where `readableSnapshotId == 13`), and/or assert via
`flussAdmin.getReadableLakeSnapshot(tablePath)` that snapshot 13 is the latest
readable snapshot before expecting `result15` to be null.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]