This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 5fd8d9cb6 [lake/paimon] Skip readable snapshot recomputation when 
latest compacted snapshot already in ZK (#2640)
5fd8d9cb6 is described below

commit 5fd8d9cb6fb6821a347b192f77ffa1d0f4697d69
Author: yuxia Luo <[email protected]>
AuthorDate: Thu Feb 12 15:01:22 2026 +0800

    [lake/paimon] Skip readable snapshot recomputation when latest compacted 
snapshot already in ZK (#2640)
---
 .../utils/DvTableReadableSnapshotRetriever.java    | 92 ++++++++++++++++----
 .../DvTableReadableSnapshotRetrieverTest.java      | 98 ++++++++++++++++++----
 2 files changed, 157 insertions(+), 33 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java
index 2499a5c44..54c3c7f94 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java
@@ -23,11 +23,13 @@ import org.apache.fluss.client.ConnectionFactory;
 import org.apache.fluss.client.admin.Admin;
 import org.apache.fluss.client.metadata.LakeSnapshot;
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
 import org.apache.fluss.lake.committer.LakeCommitResult;
 import org.apache.fluss.metadata.PartitionInfo;
 import org.apache.fluss.metadata.ResolvedPartitionSpec;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.utils.ExceptionUtils;
 import org.apache.fluss.utils.types.Tuple2;
 
 import org.apache.paimon.CoreOptions;
@@ -107,7 +109,11 @@ public class DvTableReadableSnapshotRetriever implements 
AutoCloseable {
      *
      * <ol>
      *   <li>Find the latest compacted snapshot before the given tiered 
snapshot
-     *   <li>Check which buckets have no L0 files and which have L0 files in 
the compacted snapshot
+     *   <li>Look up Fluss (ZK lake node) to see if this compacted snapshot is 
already registered.
+     *       If it exists, skip recomputing tiered and readable offsets and 
return null (no update
+     *       needed). This avoids redundant work when many APPEND snapshots 
follow a single COMPACT.
+     *   <li>Otherwise, check which buckets have no L0 files and which have L0 
files in the
+     *       compacted snapshot
      *   <li>For buckets without L0 files: use offsets from the latest tiered 
snapshot (all data is
      *       in base files, safe to advance)
      *   <li>For buckets with L0 files:
@@ -136,6 +142,7 @@ public class DvTableReadableSnapshotRetriever implements 
AutoCloseable {
      * @return a tuple containing the readable snapshot ID (the latest 
compacted snapshot) and a map
      *     of TableBucket to readable offset for all buckets, or null if:
      *     <ul>
+     *       <li>The latest compacted snapshot is already registered in Fluss 
(ZK); no update needed
      *       <li>No compacted snapshot exists before the tiered snapshot
      *       <li>Cannot find the latest snapshot holding flushed L0 files for 
some buckets
      *       <li>Cannot find the previous APPEND snapshot for some buckets
@@ -159,8 +166,44 @@ public class DvTableReadableSnapshotRetriever implements 
AutoCloseable {
             return null;
         }
 
-        // todo: optimize in #2626, if the compacted snapshot exists in zk, we 
can
-        // skip the follow check
+        LakeSnapshot lastCompactedLakeSnapshot = null;
+
+        try {
+            // Attempt to retrieve the snapshot from Fluss.
+            // This is a blocking call to unwrap the future.
+            lastCompactedLakeSnapshot =
+                    flussAdmin.getLakeSnapshot(tablePath, 
latestCompactedSnapshot.id()).get();
+        } catch (Exception e) {
+            Throwable cause = ExceptionUtils.stripExecutionException(e);
+
+            // If the error is anything other than the snapshot simply not 
existing,
+            // we log a warning but do not interrupt the flow.
+            if (!(cause instanceof LakeTableSnapshotNotExistException)) {
+                LOG.warn(
+                        "Failed to retrieve lake snapshot {} from Fluss. "
+                                + "Will attempt to advance readable snapshot 
as a fallback.",
+                        latestCompactedSnapshot.id(),
+                        cause);
+            }
+            // If LakeTableSnapshotNotExistException occurs, we silently fall 
through
+            // as it is an expected case when the snapshot hasn't been 
recorded yet.
+        }
+
+        // If we successfully retrieved a snapshot, we must validate its 
integrity.
+        if (lastCompactedLakeSnapshot != null) {
+            // Consistency Check: The ID in Fluss must strictly match the 
expected compacted ID.
+            // Should never happen
+            // If they differ, it indicates a critical state mismatch in the 
metadata.
+            checkState(
+                    lastCompactedLakeSnapshot.getSnapshotId() == 
latestCompactedSnapshot.id(),
+                    "Snapshot ID mismatch detected! Expected: %s, Actual in 
Fluss: %s",
+                    latestCompactedSnapshot.id(),
+                    lastCompactedLakeSnapshot.getSnapshotId());
+
+            // If the snapshot already exists and is valid, no further action 
(advancing) is
+            // required.
+            return null;
+        }
 
         Map<TableBucket, Long> readableOffsets = new HashMap<>();
 
@@ -202,6 +245,31 @@ public class DvTableReadableSnapshotRetriever implements 
AutoCloseable {
             }
         }
 
+        Snapshot compactedSnapshotPreviousAppendSnapshot =
+                findPreviousSnapshot(latestCompactedSnapshot.id(), 
Snapshot.CommitKind.APPEND);
+        if (compactedSnapshotPreviousAppendSnapshot == null) {
+            LOG.warn(
+                    "Failed to find a previous APPEND snapshot before 
compacted snapshot {} for table {}. "
+                            + "This prevents retrieving baseline offsets from 
Fluss.",
+                    latestCompactedSnapshot.id(),
+                    tablePath);
+            return null;
+        }
+
+        // We keep snapshots because for a compacted snapshot, if a bucket has 
L0, we find the
+        // snapshot that exactly holds those L0, then use that snapshot's 
previous APPEND's tiered
+        // offset as the readable offset (that offset is safe to read). When 
the current compacted
+        // snapshot has no L0 in any bucket, we do not traverse; for any later 
compact we would
+        // traverse to (going backwards in time), if some bucket has L0, the 
snapshot that exactly
+        // holds that L0 must be after the current compacted snapshot on the 
timeline. So that
+        // snapshot's previous APPEND cannot be earlier than the current 
compacted snapshot's
+        // previous APPEND. Therefore the minimum snapshot we need to keep is 
the current compact's
+        // previous APPEND; set earliestSnapshotIdToKeep to it so it is not 
deleted. Earlier
+        // snapshots may be safely deleted.
+        if (bucketsWithL0.isEmpty()) {
+            earliestSnapshotIdToKeep = 
compactedSnapshotPreviousAppendSnapshot.id();
+        }
+
         // for all buckets with l0, we need to find the latest compacted 
snapshot which flushed
         // the buckets, the per-bucket offset should be updated to the 
corresponding compacted
         // snapshot offsets
@@ -336,21 +404,11 @@ public class DvTableReadableSnapshotRetriever implements 
AutoCloseable {
             return null;
         }
 
-        // to get the the tiered offset for the readable snapshot,
-        Snapshot previousSnapshot =
-                findPreviousSnapshot(latestCompactedSnapshot.id(), 
Snapshot.CommitKind.APPEND);
-        if (previousSnapshot == null) {
-            LOG.warn(
-                    "Failed to find a previous APPEND snapshot before 
compacted snapshot {} for table {}. "
-                            + "This prevents retrieving baseline offsets from 
Fluss.",
-                    latestCompactedSnapshot.id(),
-                    tablePath);
-            return null;
-        }
-
-        long previousSnapshotId = previousSnapshot.id();
+        // we use the previous append snapshot tiered offset of the compacted 
snapshot as the
+        // compacted snapshot tiered offsets
         LakeSnapshot tieredLakeSnapshot =
-                getOrFetchLakeSnapshot(previousSnapshotId, 
lakeSnapshotBySnapshotId);
+                getOrFetchLakeSnapshot(
+                        compactedSnapshotPreviousAppendSnapshot.id(), 
lakeSnapshotBySnapshotId);
         if (tieredLakeSnapshot == null) {
             return null;
         }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetrieverTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetrieverTest.java
index 912c28285..a5a12e214 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetrieverTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetrieverTest.java
@@ -413,6 +413,22 @@ class DvTableReadableSnapshotRetrieverTest {
         // snapshot6 safely since we won't need to search for any earlier 
snapshots to get readable
         // offsets
         
assertThat(readableSnapshotAndOffsets.getEarliestSnapshotIdToKeep()).isEqualTo(6);
+        commitSnapshot(
+                tableId,
+                tablePath,
+                snapshot14,
+                tieredLakeSnapshotEndOffset,
+                retriveReadableSnapshotAndOffsets(tablePath, fileStoreTable, 
snapshot14));
+
+        // when the compacted snapshot is already registered in ZK,
+        // getReadableSnapshotAndOffsets skips recomputation and returns null.
+        long snapshot15 = writeAndCommitData(fileStoreTable, 
Collections.emptyMap());
+        DvTableReadableSnapshotRetriever.ReadableSnapshotResult result15 =
+                retriveReadableSnapshotAndOffsets(tablePath, fileStoreTable, 
snapshot15);
+        assertThat(result15)
+                .as(
+                        "Compacted snapshot 13 is already in ZK, should skip 
recomputation and return null")
+                .isNull();
     }
 
     @Test
@@ -615,24 +631,11 @@ class DvTableReadableSnapshotRetrieverTest {
                         bucket0, generateRowsForPartition(partition1, bucket0, 
3, 6));
         long snapshot9 = writeAndCommitData(fileStoreTable, appendRowsP1More);
         tieredLakeSnapshotEndOffset.put(tbP1B0, 6L);
+        // Snapshot 7 is already registered in Fluss cluster, so the retrieve 
result is null
+        // (no update needed).
         readableSnapshotAndOffsets =
                 retriveReadableSnapshotAndOffsets(tablePath, fileStoreTable, 
snapshot9);
-        // readable_snapshot = snapshot7 (unchanged, partition1/bucket0 still 
has L0 from snapshot9)
-        // readable_offsets: partition0/bucket0 uses snapshot6's offset (6L),
-        //                   partition0/bucket1 uses snapshot1's offset (3L),
-        //                   partition1/bucket0 uses snapshot3's offset (3L) - 
from snapshot5
-        // compaction
-        
assertThat(readableSnapshotAndOffsets.getReadableSnapshotId()).isEqualTo(snapshot7);
-        expectedReadableOffsets = new HashMap<>();
-        expectedReadableOffsets.put(tbP0B0, 6L);
-        expectedReadableOffsets.put(tbP0B1, 3L);
-        expectedReadableOffsets.put(tbP1B0, 3L);
-        assertThat(readableSnapshotAndOffsets.getReadableOffsets())
-                .isEqualTo(expectedReadableOffsets);
-        // all buckets L0 level has been flushed in snapshot3, we can delete 
all snapshots prior to
-        // snapshot3 safely since we won't need to search for any earlier 
snapshots to get readable
-        // offsets
-        
assertThat(readableSnapshotAndOffsets.getEarliestSnapshotIdToKeep()).isEqualTo(3);
+        assertThat(readableSnapshotAndOffsets).isNull();
         commitSnapshot(
                 tableId,
                 tablePath,
@@ -669,6 +672,69 @@ class DvTableReadableSnapshotRetrieverTest {
         expectedReadableOffsets.put(tbP1B0, 6L);
         assertThat(readableSnapshotAndOffsets.getReadableOffsets())
                 .isEqualTo(expectedReadableOffsets);
+        commitSnapshot(
+                tableId,
+                tablePath,
+                snapshot11,
+                tieredLakeSnapshotEndOffset,
+                readableSnapshotAndOffsets);
+
+        // Step 9: APPEND snapshot 12 - write more data to partition0, bucket1
+        // Snapshot 12 state:
+        //   ┌─────────────┬─────────┬─────────────────────────────┐
+        //   │ Partition   │ Bucket  │ Files                       │
+        //   ├─────────────┼─────────┼─────────────────────────────┤
+        //   │ partition0  │ bucket0 │ L1: [from s7 (rows 0-5)]    │ ← 
unchanged
+        //   │ partition0  │ bucket1 │ L1: [from s7 (rows 0-2)]    │
+        //   │             │         │ L0: [rows 3-6] ← new        │ ← added 
in snapshot12
+        //   │ partition1  │ bucket0 │ L1: [from s10 (rows 0-5)]   │ ← 
unchanged
+        //   └─────────────┴─────────┴─────────────────────────────┘
+        Map<Integer, List<GenericRow>> appendRowsP0B1 =
+                Collections.singletonMap(
+                        bucket1, generateRowsForPartition(partition0, bucket1, 
3, 7));
+        long snapshot12 = writeAndCommitData(fileStoreTable, appendRowsP0B1);
+        tieredLakeSnapshotEndOffset.put(tbP0B1, 6L);
+        commitSnapshot(
+                tableId,
+                tablePath,
+                snapshot12,
+                tieredLakeSnapshotEndOffset,
+                retriveReadableSnapshotAndOffsets(tablePath, fileStoreTable, 
snapshot12));
+
+        // Step 10: COMPACT snapshot 13 - compact partition0, bucket1 again 
(flushes snapshot12's
+        // L0)
+        // Snapshot 13 state (after compacting partition0, bucket1):
+        //   ┌─────────────┬─────────┬─────────────────────────────┐
+        //   │ Partition   │ Bucket  │ Files                       │
+        //   ├─────────────┼─────────┼─────────────────────────────┤
+        //   │ partition0  │ bucket0 │ L1: [from s7 (rows 0-5)]    │ ← 
unchanged
+        //   │ partition0  │ bucket1 │ L1: [merged s7 L1 + s12 L0] │ ← s12's 
L0 flushed to L1
+        //   │             │         │     [rows 0-6 total]        │
+        //   │ partition1  │ bucket0 │ L1: [from s10 (rows 0-5)]   │ ← 
unchanged
+        //   └─────────────┴─────────┴─────────────────────────────┘
+        compactHelper.compactBucket(partition0BinaryRow, bucket1).commit();
+        long snapshot13 = latestSnapshot(fileStoreTable);
+
+        // Create an empty tiered snapshot (snapshot14) to simulate tiered 
snapshot commit
+        long snapshot14 = writeAndCommitData(fileStoreTable, 
Collections.emptyMap());
+
+        readableSnapshotAndOffsets =
+                retriveReadableSnapshotAndOffsets(tablePath, fileStoreTable, 
snapshot14);
+        // readable_snapshot = snapshot13
+        // readable_offsets: partition0/bucket0 uses snapshot6's offset (6L),
+        //                   partition0/bucket1 uses snapshot12's offset (6L) 
- flushed in s13,
+        //                   partition1/bucket0 uses snapshot9's offset (6L)
+        
assertThat(readableSnapshotAndOffsets.getReadableSnapshotId()).isEqualTo(snapshot13);
+        expectedReadableOffsets = new HashMap<>();
+        expectedReadableOffsets.put(tbP0B0, 6L);
+        expectedReadableOffsets.put(tbP0B1, 6L);
+        expectedReadableOffsets.put(tbP1B0, 6L);
+        assertThat(readableSnapshotAndOffsets.getReadableOffsets())
+                .isEqualTo(expectedReadableOffsets);
+
+        // After compact 13, all buckets have no L0 in the compacted snapshot, 
we only need
+        // to keep the previous append snapshot 12
+        
assertThat(readableSnapshotAndOffsets.getEarliestSnapshotIdToKeep()).isEqualTo(snapshot12);
     }
 
     private long latestSnapshot(FileStoreTable fileStoreTable) {

Reply via email to