Copilot commented on code in PR #2640:
URL: https://github.com/apache/fluss/pull/2640#discussion_r2791280551


##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java:
##########
@@ -159,8 +166,43 @@ public ReadableSnapshotResult 
getReadableSnapshotAndOffsets(long tieredSnapshotI
             return null;
         }
 
-        // todo: optimize in #2626, if the compacted snapshot exists in zk, we 
can
-        // skip the follow check
+        LakeSnapshot lastCompactedLakeSnapshot = null;
+
+        try {
+            // Attempt to retrieve the snapshot from Fluss.
+            // This is a blocking call to unwrap the future.
+            lastCompactedLakeSnapshot =
+                    flussAdmin.getLakeSnapshot(tablePath, 
latestCompactedSnapshot.id()).get();
+        } catch (Exception e) {
+            Throwable cause = ExceptionUtils.stripExecutionException(e);
+
+            // If the error is anything other than the snapshot simply not 
existing,
+            // we log a warning but do not interrupt the flow.
+            if (!(cause instanceof LakeTableSnapshotNotExistException)) {
+                LOG.warn(
+                        "Failed to retrieve lake snapshot {} from Fluss. "
+                                + "Will attempt to advance readable snapshot 
as a fallback.",
+                        latestCompactedSnapshot.id(),
+                        cause);
+            }
+            // If LakeTableSnapshotNotExistException occurs, we silently fall 
through
+            // as it is an expected case when the snapshot hasn't been 
recorded yet.
+        }

Review Comment:
   The `catch (Exception e)` around `flussAdmin.getLakeSnapshot(...).get()` can 
swallow `InterruptedException` (or a wrapped interruption) without restoring 
the thread’s interrupted status. In this codebase, blocking future waits 
typically catch `InterruptedException`, call 
`Thread.currentThread().interrupt()`, and then fail fast (or propagate). Please 
handle `InterruptedException` explicitly (and optionally `ExecutionException` 
separately), re-interrupt the thread, and avoid continuing with 
potentially-cancelled work.



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java:
##########
@@ -159,8 +166,43 @@ public ReadableSnapshotResult 
getReadableSnapshotAndOffsets(long tieredSnapshotI
             return null;
         }
 
-        // todo: optimize in #2626, if the compacted snapshot exists in zk, we 
can
-        // skip the follow check
+        LakeSnapshot lastCompactedLakeSnapshot = null;
+
+        try {
+            // Attempt to retrieve the snapshot from Fluss.
+            // This is a blocking call to unwrap the future.
+            lastCompactedLakeSnapshot =
+                    flussAdmin.getLakeSnapshot(tablePath, 
latestCompactedSnapshot.id()).get();
+        } catch (Exception e) {
+            Throwable cause = ExceptionUtils.stripExecutionException(e);
+
+            // If the error is anything other than the snapshot simply not 
existing,
+            // we log a warning but do not interrupt the flow.
+            if (!(cause instanceof LakeTableSnapshotNotExistException)) {
+                LOG.warn(
+                        "Failed to retrieve lake snapshot {} from Fluss. "
+                                + "Will attempt to advance readable snapshot 
as a fallback.",
+                        latestCompactedSnapshot.id(),
+                        cause);
+            }
+            // If LakeTableSnapshotNotExistException occurs, we silently fall 
through
+            // as it is an expected case when the snapshot hasn't been 
recorded yet.
+        }
+
+        // If we successfully retrieved a snapshot, we must validate its 
integrity.
+        if (lastCompactedLakeSnapshot != null) {
+            // Consistency Check: The ID in Fluss must strictly match the 
expected compacted ID.
+            // If they differ, it indicates a critical state mismatch in the 
metadata.
+            checkState(
+                    lastCompactedLakeSnapshot.getSnapshotId() == 
latestCompactedSnapshot.id(),
+                    "Snapshot ID mismatch detected! Expected: %s, Actual in 
Fluss: %s",
+                    latestCompactedSnapshot.id(),
+                    lastCompactedLakeSnapshot.getSnapshotId());
+
+            // If the snapshot already exists and is valid, no further action 
(advancing) is
+            // required.
+            return null;
+        }

Review Comment:
   Returning `null` here changes the commit path to 
`LakeCommitResult.unknownReadableSnapshot(...)` (see `PaimonLakeCommitter`), 
which sets `earliestSnapshotIDToKeep = KEEP_ALL_PREVIOUS (-1)`. That 
effectively disables snapshot cleanup for subsequent commits and can cause 
unbounded growth of lake snapshot metadata/files. If the intent is to skip 
*recomputation* (and/or re-commit) while preserving retention behavior, 
consider fetching the already-registered readable snapshot/offsets from Fluss 
(e.g., via `getReadableLakeSnapshot` + `getLakeSnapshot` for tiered offsets) 
and still returning a `ReadableSnapshotResult`, or introduce a commit result 
variant that skips re-committing the readable snapshot but still carries 
forward the correct `earliestSnapshotIdToKeep`.



##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetrieverTest.java:
##########
@@ -413,6 +413,22 @@ void testGetReadableSnapshotAndOffsets() throws Exception {
         // snapshot6 safely since we won't need to search for any earlier 
snapshots to get readable
         // offsets
         
assertThat(readableSnapshotAndOffsets.getEarliestSnapshotIdToKeep()).isEqualTo(6);
+        commitSnapshot(
+                tableId,
+                tablePath,
+                snapshot13,
+                tieredLakeSnapshotEndOffset,
+                retriveReadableSnapshotAndOffsets(tablePath, fileStoreTable, 
snapshot13));
+
+        // when the compacted snapshot is already registered in ZK,
+        // getReadableSnapshotAndOffsets skips recomputation and returns null.
+        long snapshot15 = writeAndCommitData(fileStoreTable, 
Collections.emptyMap());
+        DvTableReadableSnapshotRetriever.ReadableSnapshotResult result15 =
+                retriveReadableSnapshotAndOffsets(tablePath, fileStoreTable, 
snapshot15);
+        assertThat(result15)
+                .as(
+                        "Compacted snapshot 13 is already in ZK, should skip 
recomputation and return null")
+                .isNull();

Review Comment:
   This test commits `snapshot13` by calling `commitSnapshot(..., snapshot13, 
..., retriveReadableSnapshotAndOffsets(..., snapshot13))`, but 
`getReadableSnapshotAndOffsets(snapshot13)` searches for the previous COMPACT 
snapshot (before 13) and likely won’t register snapshot 13 as a *readable* lake 
snapshot (i.e., with readable offsets). Since the new optimization only checks 
existence via `Admin#getLakeSnapshot` (tiered offsets), the test can pass even 
if readable offsets for snapshot 13 were never committed, which doesn’t match 
the intended scenario (“latest compacted snapshot already registered”). To make 
the test validate the real behavior, register snapshot 13 as readable by 
committing the tiered snapshot that produced `readableSnapshotAndOffsets` for 
snapshot14 (where `readableSnapshotId == 13`), and/or assert via 
`flussAdmin.getReadableLakeSnapshot(tablePath)` that snapshot 13 is the latest 
readable snapshot before expecting `result15` to be null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to