This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 5fd8d9cb6 [lake/paimon] Skip readable snapshot recomputation when
latest compacted snapshot already in ZK (#2640)
5fd8d9cb6 is described below
commit 5fd8d9cb6fb6821a347b192f77ffa1d0f4697d69
Author: yuxia Luo <[email protected]>
AuthorDate: Thu Feb 12 15:01:22 2026 +0800
[lake/paimon] Skip readable snapshot recomputation when latest compacted
snapshot already in ZK (#2640)
---
.../utils/DvTableReadableSnapshotRetriever.java | 92 ++++++++++++++++----
.../DvTableReadableSnapshotRetrieverTest.java | 98 ++++++++++++++++++----
2 files changed, 157 insertions(+), 33 deletions(-)
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java
index 2499a5c44..54c3c7f94 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java
@@ -23,11 +23,13 @@ import org.apache.fluss.client.ConnectionFactory;
import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.client.metadata.LakeSnapshot;
import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
import org.apache.fluss.lake.committer.LakeCommitResult;
import org.apache.fluss.metadata.PartitionInfo;
import org.apache.fluss.metadata.ResolvedPartitionSpec;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.utils.ExceptionUtils;
import org.apache.fluss.utils.types.Tuple2;
import org.apache.paimon.CoreOptions;
@@ -107,7 +109,11 @@ public class DvTableReadableSnapshotRetriever implements
AutoCloseable {
*
* <ol>
* <li>Find the latest compacted snapshot before the given tiered
snapshot
- * <li>Check which buckets have no L0 files and which have L0 files in
the compacted snapshot
+ * <li>Look up Fluss (ZK lake node) to see if this compacted snapshot is
already registered.
+ * If it exists, skip recomputing tiered and readable offsets and
return null (no update
+ * needed). This avoids redundant work when many APPEND snapshots
follow a single COMPACT.
+ * <li>Otherwise, check which buckets have no L0 files and which have L0
files in the
+ * compacted snapshot
* <li>For buckets without L0 files: use offsets from the latest tiered
snapshot (all data is
* in base files, safe to advance)
* <li>For buckets with L0 files:
@@ -136,6 +142,7 @@ public class DvTableReadableSnapshotRetriever implements
AutoCloseable {
* @return a tuple containing the readable snapshot ID (the latest
compacted snapshot) and a map
* of TableBucket to readable offset for all buckets, or null if:
* <ul>
+ * <li>The latest compacted snapshot is already registered in Fluss
(ZK); no update needed
* <li>No compacted snapshot exists before the tiered snapshot
* <li>Cannot find the latest snapshot holding flushed L0 files for
some buckets
* <li>Cannot find the previous APPEND snapshot for some buckets
@@ -159,8 +166,44 @@ public class DvTableReadableSnapshotRetriever implements
AutoCloseable {
return null;
}
- // todo: optimize in #2626, if the compacted snapshot exists in zk, we
can
- // skip the follow check
+ LakeSnapshot lastCompactedLakeSnapshot = null;
+
+ try {
+ // Attempt to retrieve the snapshot from Fluss.
+ // This is a blocking call to unwrap the future.
+ lastCompactedLakeSnapshot =
+ flussAdmin.getLakeSnapshot(tablePath,
latestCompactedSnapshot.id()).get();
+ } catch (Exception e) {
+ Throwable cause = ExceptionUtils.stripExecutionException(e);
+
+ // If the error is anything other than the snapshot simply not
existing,
+ // we log a warning but do not interrupt the flow.
+ if (!(cause instanceof LakeTableSnapshotNotExistException)) {
+ LOG.warn(
+ "Failed to retrieve lake snapshot {} from Fluss. "
+ + "Will attempt to advance readable snapshot
as a fallback.",
+ latestCompactedSnapshot.id(),
+ cause);
+ }
+ // If LakeTableSnapshotNotExistException occurs, we silently fall
through
+ // as it is an expected case when the snapshot hasn't been
recorded yet.
+ }
+
+ // If we successfully retrieved a snapshot, we must validate its
integrity.
+ if (lastCompactedLakeSnapshot != null) {
+ // Consistency Check: The ID in Fluss must strictly match the
expected compacted ID.
+ // Should never happen
+ // If they differ, it indicates a critical state mismatch in the
metadata.
+ checkState(
+ lastCompactedLakeSnapshot.getSnapshotId() ==
latestCompactedSnapshot.id(),
+ "Snapshot ID mismatch detected! Expected: %s, Actual in
Fluss: %s",
+ latestCompactedSnapshot.id(),
+ lastCompactedLakeSnapshot.getSnapshotId());
+
+ // If the snapshot already exists and is valid, no further action
(advancing) is
+ // required.
+ return null;
+ }
Map<TableBucket, Long> readableOffsets = new HashMap<>();
@@ -202,6 +245,31 @@ public class DvTableReadableSnapshotRetriever implements
AutoCloseable {
}
}
+ Snapshot compactedSnapshotPreviousAppendSnapshot =
+ findPreviousSnapshot(latestCompactedSnapshot.id(),
Snapshot.CommitKind.APPEND);
+ if (compactedSnapshotPreviousAppendSnapshot == null) {
+ LOG.warn(
+ "Failed to find a previous APPEND snapshot before
compacted snapshot {} for table {}. "
+ + "This prevents retrieving baseline offsets from
Fluss.",
+ latestCompactedSnapshot.id(),
+ tablePath);
+ return null;
+ }
+
+ // We keep snapshots because for a compacted snapshot, if a bucket has
L0, we find the
+ // snapshot that exactly holds those L0, then use that snapshot's
previous APPEND's tiered
+ // offset as the readable offset (that offset is safe to read). When
the current compacted
+ // snapshot has no L0 in any bucket, we do not traverse; for any later
compact we would
+ // traverse to (going backwards in time), if some bucket has L0, the
snapshot that exactly
+ // holds that L0 must be after the current compacted snapshot on the
timeline. So that
+ // snapshot's previous APPEND cannot be earlier than the current
compacted snapshot's
+ // previous APPEND. Therefore the minimum snapshot we need to keep is
the current compact's
+ // previous APPEND; set earliestSnapshotIdToKeep to it so it is not
deleted. Earlier
+ // snapshots may be safely deleted.
+ if (bucketsWithL0.isEmpty()) {
+ earliestSnapshotIdToKeep =
compactedSnapshotPreviousAppendSnapshot.id();
+ }
+
// for all buckets with l0, we need to find the latest compacted
snapshot which flushed
// the buckets, the per-bucket offset should be updated to the
corresponding compacted
// snapshot offsets
@@ -336,21 +404,11 @@ public class DvTableReadableSnapshotRetriever implements
AutoCloseable {
return null;
}
- // to get the the tiered offset for the readable snapshot,
- Snapshot previousSnapshot =
- findPreviousSnapshot(latestCompactedSnapshot.id(),
Snapshot.CommitKind.APPEND);
- if (previousSnapshot == null) {
- LOG.warn(
- "Failed to find a previous APPEND snapshot before
compacted snapshot {} for table {}. "
- + "This prevents retrieving baseline offsets from
Fluss.",
- latestCompactedSnapshot.id(),
- tablePath);
- return null;
- }
-
- long previousSnapshotId = previousSnapshot.id();
+ // we use the previous append snapshot tiered offset of the compacted
snapshot as the
+ // compacted snapshot tiered offsets
LakeSnapshot tieredLakeSnapshot =
- getOrFetchLakeSnapshot(previousSnapshotId,
lakeSnapshotBySnapshotId);
+ getOrFetchLakeSnapshot(
+ compactedSnapshotPreviousAppendSnapshot.id(),
lakeSnapshotBySnapshotId);
if (tieredLakeSnapshot == null) {
return null;
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetrieverTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetrieverTest.java
index 912c28285..a5a12e214 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetrieverTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetrieverTest.java
@@ -413,6 +413,22 @@ class DvTableReadableSnapshotRetrieverTest {
// snapshot6 safely since we won't need to search for any earlier
snapshots to get readable
// offsets
assertThat(readableSnapshotAndOffsets.getEarliestSnapshotIdToKeep()).isEqualTo(6);
+ commitSnapshot(
+ tableId,
+ tablePath,
+ snapshot14,
+ tieredLakeSnapshotEndOffset,
+ retriveReadableSnapshotAndOffsets(tablePath, fileStoreTable,
snapshot14));
+
+ // when the compacted snapshot is already registered in ZK,
+ // getReadableSnapshotAndOffsets skips recomputation and returns null.
+ long snapshot15 = writeAndCommitData(fileStoreTable,
Collections.emptyMap());
+ DvTableReadableSnapshotRetriever.ReadableSnapshotResult result15 =
+ retriveReadableSnapshotAndOffsets(tablePath, fileStoreTable,
snapshot15);
+ assertThat(result15)
+ .as(
+ "Compacted snapshot 13 is already in ZK, should skip
recomputation and return null")
+ .isNull();
}
@Test
@@ -615,24 +631,11 @@ class DvTableReadableSnapshotRetrieverTest {
bucket0, generateRowsForPartition(partition1, bucket0,
3, 6));
long snapshot9 = writeAndCommitData(fileStoreTable, appendRowsP1More);
tieredLakeSnapshotEndOffset.put(tbP1B0, 6L);
+ // Snapshot 7 is already registered in Fluss cluster, so the retrieve
result is null
+ // (no update needed).
readableSnapshotAndOffsets =
retriveReadableSnapshotAndOffsets(tablePath, fileStoreTable,
snapshot9);
- // readable_snapshot = snapshot7 (unchanged, partition1/bucket0 still
has L0 from snapshot9)
- // readable_offsets: partition0/bucket0 uses snapshot6's offset (6L),
- // partition0/bucket1 uses snapshot1's offset (3L),
- // partition1/bucket0 uses snapshot3's offset (3L) -
from snapshot5
- // compaction
-
assertThat(readableSnapshotAndOffsets.getReadableSnapshotId()).isEqualTo(snapshot7);
- expectedReadableOffsets = new HashMap<>();
- expectedReadableOffsets.put(tbP0B0, 6L);
- expectedReadableOffsets.put(tbP0B1, 3L);
- expectedReadableOffsets.put(tbP1B0, 3L);
- assertThat(readableSnapshotAndOffsets.getReadableOffsets())
- .isEqualTo(expectedReadableOffsets);
- // all buckets L0 level has been flushed in snapshot3, we can delete
all snapshots prior to
- // snapshot3 safely since we won't need to search for any earlier
snapshots to get readable
- // offsets
-
assertThat(readableSnapshotAndOffsets.getEarliestSnapshotIdToKeep()).isEqualTo(3);
+ assertThat(readableSnapshotAndOffsets).isNull();
commitSnapshot(
tableId,
tablePath,
@@ -669,6 +672,69 @@ class DvTableReadableSnapshotRetrieverTest {
expectedReadableOffsets.put(tbP1B0, 6L);
assertThat(readableSnapshotAndOffsets.getReadableOffsets())
.isEqualTo(expectedReadableOffsets);
+ commitSnapshot(
+ tableId,
+ tablePath,
+ snapshot11,
+ tieredLakeSnapshotEndOffset,
+ readableSnapshotAndOffsets);
+
+ // Step 9: APPEND snapshot 12 - write more data to partition0, bucket1
+ // Snapshot 12 state:
+ // ┌─────────────┬─────────┬─────────────────────────────┐
+ // │ Partition │ Bucket │ Files │
+ // ├─────────────┼─────────┼─────────────────────────────┤
+ // │ partition0 │ bucket0 │ L1: [from s7 (rows 0-5)] │ ←
unchanged
+ // │ partition0 │ bucket1 │ L1: [from s7 (rows 0-2)] │
+ // │ │ │ L0: [rows 3-6] ← new │ ← added
in snapshot12
+ // │ partition1 │ bucket0 │ L1: [from s10 (rows 0-5)] │ ←
unchanged
+ // └─────────────┴─────────┴─────────────────────────────┘
+ Map<Integer, List<GenericRow>> appendRowsP0B1 =
+ Collections.singletonMap(
+ bucket1, generateRowsForPartition(partition0, bucket1,
3, 7));
+ long snapshot12 = writeAndCommitData(fileStoreTable, appendRowsP0B1);
+ tieredLakeSnapshotEndOffset.put(tbP0B1, 6L);
+ commitSnapshot(
+ tableId,
+ tablePath,
+ snapshot12,
+ tieredLakeSnapshotEndOffset,
+ retriveReadableSnapshotAndOffsets(tablePath, fileStoreTable,
snapshot12));
+
+ // Step 10: COMPACT snapshot 13 - compact partition0, bucket1 again
(flushes snapshot12's
+ // L0)
+ // Snapshot 13 state (after compacting partition0, bucket1):
+ // ┌─────────────┬─────────┬─────────────────────────────┐
+ // │ Partition │ Bucket │ Files │
+ // ├─────────────┼─────────┼─────────────────────────────┤
+ // │ partition0 │ bucket0 │ L1: [from s7 (rows 0-5)] │ ←
unchanged
+ // │ partition0 │ bucket1 │ L1: [merged s7 L1 + s12 L0] │ ← s12's
L0 flushed to L1
+ // │ │ │ [rows 0-6 total] │
+ // │ partition1 │ bucket0 │ L1: [from s10 (rows 0-5)] │ ←
unchanged
+ // └─────────────┴─────────┴─────────────────────────────┘
+ compactHelper.compactBucket(partition0BinaryRow, bucket1).commit();
+ long snapshot13 = latestSnapshot(fileStoreTable);
+
+ // Create an empty tiered snapshot (snapshot14) to simulate tiered
snapshot commit
+ long snapshot14 = writeAndCommitData(fileStoreTable,
Collections.emptyMap());
+
+ readableSnapshotAndOffsets =
+ retriveReadableSnapshotAndOffsets(tablePath, fileStoreTable,
snapshot14);
+ // readable_snapshot = snapshot13
+ // readable_offsets: partition0/bucket0 uses snapshot6's offset (6L),
+ // partition0/bucket1 uses snapshot12's offset (6L)
- flushed in s13,
+ // partition1/bucket0 uses snapshot9's offset (6L)
+
assertThat(readableSnapshotAndOffsets.getReadableSnapshotId()).isEqualTo(snapshot13);
+ expectedReadableOffsets = new HashMap<>();
+ expectedReadableOffsets.put(tbP0B0, 6L);
+ expectedReadableOffsets.put(tbP0B1, 6L);
+ expectedReadableOffsets.put(tbP1B0, 6L);
+ assertThat(readableSnapshotAndOffsets.getReadableOffsets())
+ .isEqualTo(expectedReadableOffsets);
+
+ // After compact 13, all buckets have no L0 in the compacted snapshot,
we only need
+ // to keep the previous append snapshot 12
+
assertThat(readableSnapshotAndOffsets.getEarliestSnapshotIdToKeep()).isEqualTo(snapshot12);
}
private long latestSnapshot(FileStoreTable fileStoreTable) {