This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-0.9 in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 0f22f788a4e1b61871e0059b80214fbbfcbd71d9 Author: yuxia Luo <[email protected]> AuthorDate: Thu Feb 12 15:01:22 2026 +0800 [lake/paimon] Skip readable snapshot recomputation when latest compacted snapshot already in ZK (#2640) --- .../utils/DvTableReadableSnapshotRetriever.java | 92 ++++++++++++++++---- .../DvTableReadableSnapshotRetrieverTest.java | 98 ++++++++++++++++++---- 2 files changed, 157 insertions(+), 33 deletions(-) diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java index 2499a5c44..54c3c7f94 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java @@ -23,11 +23,13 @@ import org.apache.fluss.client.ConnectionFactory; import org.apache.fluss.client.admin.Admin; import org.apache.fluss.client.metadata.LakeSnapshot; import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.LakeTableSnapshotNotExistException; import org.apache.fluss.lake.committer.LakeCommitResult; import org.apache.fluss.metadata.PartitionInfo; import org.apache.fluss.metadata.ResolvedPartitionSpec; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.utils.ExceptionUtils; import org.apache.fluss.utils.types.Tuple2; import org.apache.paimon.CoreOptions; @@ -107,7 +109,11 @@ public class DvTableReadableSnapshotRetriever implements AutoCloseable { * * <ol> * <li>Find the latest compacted snapshot before the given tiered snapshot - * <li>Check which buckets have no L0 files and which have L0 files in the compacted snapshot + * <li>Look up Fluss (ZK lake node) to see if this compacted snapshot is already registered. + * If it exists, skip recomputing tiered and readable offsets and return null (no update + * needed). This avoids redundant work when many APPEND snapshots follow a single COMPACT. + * <li>Otherwise, check which buckets have no L0 files and which have L0 files in the + * compacted snapshot * <li>For buckets without L0 files: use offsets from the latest tiered snapshot (all data is * in base files, safe to advance) * <li>For buckets with L0 files: @@ -136,6 +142,7 @@ public class DvTableReadableSnapshotRetriever implements AutoCloseable { * @return a tuple containing the readable snapshot ID (the latest compacted snapshot) and a map * of TableBucket to readable offset for all buckets, or null if: * <ul> + * <li>The latest compacted snapshot is already registered in Fluss (ZK); no update needed * <li>No compacted snapshot exists before the tiered snapshot * <li>Cannot find the latest snapshot holding flushed L0 files for some buckets * <li>Cannot find the previous APPEND snapshot for some buckets @@ -159,8 +166,44 @@ public class DvTableReadableSnapshotRetriever implements AutoCloseable { return null; } - // todo: optimize in #2626, if the compacted snapshot exists in zk, we can - // skip the follow check + LakeSnapshot lastCompactedLakeSnapshot = null; + + try { + // Attempt to retrieve the snapshot from Fluss. + // This is a blocking call to unwrap the future. + lastCompactedLakeSnapshot = + flussAdmin.getLakeSnapshot(tablePath, latestCompactedSnapshot.id()).get(); + } catch (Exception e) { + Throwable cause = ExceptionUtils.stripExecutionException(e); + + // If the error is anything other than the snapshot simply not existing, + // we log a warning but do not interrupt the flow. + if (!(cause instanceof LakeTableSnapshotNotExistException)) { + LOG.warn( + "Failed to retrieve lake snapshot {} from Fluss. " + + "Will attempt to advance readable snapshot as a fallback.", + latestCompactedSnapshot.id(), + cause); + } + // If LakeTableSnapshotNotExistException occurs, we silently fall through + // as it is an expected case when the snapshot hasn't been recorded yet. + } + + // If we successfully retrieved a snapshot, we must validate its integrity. + if (lastCompactedLakeSnapshot != null) { + // Consistency Check: The ID in Fluss must strictly match the expected compacted ID. + // Should never happen + // If they differ, it indicates a critical state mismatch in the metadata. + checkState( + lastCompactedLakeSnapshot.getSnapshotId() == latestCompactedSnapshot.id(), + "Snapshot ID mismatch detected! Expected: %s, Actual in Fluss: %s", + latestCompactedSnapshot.id(), + lastCompactedLakeSnapshot.getSnapshotId()); + + // If the snapshot already exists and is valid, no further action (advancing) is + // required. + return null; + } Map<TableBucket, Long> readableOffsets = new HashMap<>(); @@ -202,6 +245,31 @@ public class DvTableReadableSnapshotRetriever implements AutoCloseable { } } + Snapshot compactedSnapshotPreviousAppendSnapshot = + findPreviousSnapshot(latestCompactedSnapshot.id(), Snapshot.CommitKind.APPEND); + if (compactedSnapshotPreviousAppendSnapshot == null) { + LOG.warn( + "Failed to find a previous APPEND snapshot before compacted snapshot {} for table {}. " + + "This prevents retrieving baseline offsets from Fluss.", + latestCompactedSnapshot.id(), + tablePath); + return null; + } + + // We keep snapshots because for a compacted snapshot, if a bucket has L0, we find the + // snapshot that exactly holds those L0, then use that snapshot's previous APPEND's tiered + // offset as the readable offset (that offset is safe to read). When the current compacted + // snapshot has no L0 in any bucket, we do not traverse; for any later compact we would + // traverse to (going backwards in time), if some bucket has L0, the snapshot that exactly + // holds that L0 must be after the current compacted snapshot on the timeline. So that + // snapshot's previous APPEND cannot be earlier than the current compacted snapshot's + // previous APPEND. Therefore the minimum snapshot we need to keep is the current compact's + // previous APPEND; set earliestSnapshotIdToKeep to it so it is not deleted. Earlier + // snapshots may be safely deleted. + if (bucketsWithL0.isEmpty()) { + earliestSnapshotIdToKeep = compactedSnapshotPreviousAppendSnapshot.id(); + } + // for all buckets with l0, we need to find the latest compacted snapshot which flushed // the buckets, the per-bucket offset should be updated to the corresponding compacted // snapshot offsets @@ -336,21 +404,11 @@ public class DvTableReadableSnapshotRetriever implements AutoCloseable { return null; } - // to get the the tiered offset for the readable snapshot, - Snapshot previousSnapshot = - findPreviousSnapshot(latestCompactedSnapshot.id(), Snapshot.CommitKind.APPEND); - if (previousSnapshot == null) { - LOG.warn( - "Failed to find a previous APPEND snapshot before compacted snapshot {} for table {}. " - + "This prevents retrieving baseline offsets from Fluss.", - latestCompactedSnapshot.id(), - tablePath); - return null; - } - - long previousSnapshotId = previousSnapshot.id(); + // we use the previous append snapshot tiered offset of the compacted snapshot as the + // compacted snapshot tiered offsets LakeSnapshot tieredLakeSnapshot = - getOrFetchLakeSnapshot(previousSnapshotId, lakeSnapshotBySnapshotId); + getOrFetchLakeSnapshot( + compactedSnapshotPreviousAppendSnapshot.id(), lakeSnapshotBySnapshotId); if (tieredLakeSnapshot == null) { return null; } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetrieverTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetrieverTest.java index 912c28285..a5a12e214 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetrieverTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetrieverTest.java @@ -413,6 +413,22 @@ class DvTableReadableSnapshotRetrieverTest { // snapshot6 safely since we won't need to search for any earlier snapshots to get readable // offsets assertThat(readableSnapshotAndOffsets.getEarliestSnapshotIdToKeep()).isEqualTo(6); + commitSnapshot( + tableId, + tablePath, + snapshot14, + tieredLakeSnapshotEndOffset, + retriveReadableSnapshotAndOffsets(tablePath, fileStoreTable, snapshot14)); + + // when the compacted snapshot is already registered in ZK, + // getReadableSnapshotAndOffsets skips recomputation and returns null. + long snapshot15 = writeAndCommitData(fileStoreTable, Collections.emptyMap()); + DvTableReadableSnapshotRetriever.ReadableSnapshotResult result15 = + retriveReadableSnapshotAndOffsets(tablePath, fileStoreTable, snapshot15); + assertThat(result15) + .as( + "Compacted snapshot 13 is already in ZK, should skip recomputation and return null") + .isNull(); } @Test @@ -615,24 +631,11 @@ class DvTableReadableSnapshotRetrieverTest { bucket0, generateRowsForPartition(partition1, bucket0, 3, 6)); long snapshot9 = writeAndCommitData(fileStoreTable, appendRowsP1More); tieredLakeSnapshotEndOffset.put(tbP1B0, 6L); + // Snapshot 7 is already registered in Fluss cluster, so the retrieve result is null + // (no update needed). readableSnapshotAndOffsets = retriveReadableSnapshotAndOffsets(tablePath, fileStoreTable, snapshot9); - // readable_snapshot = snapshot7 (unchanged, partition1/bucket0 still has L0 from snapshot9) - // readable_offsets: partition0/bucket0 uses snapshot6's offset (6L), - // partition0/bucket1 uses snapshot1's offset (3L), - // partition1/bucket0 uses snapshot3's offset (3L) - from snapshot5 - // compaction - assertThat(readableSnapshotAndOffsets.getReadableSnapshotId()).isEqualTo(snapshot7); - expectedReadableOffsets = new HashMap<>(); - expectedReadableOffsets.put(tbP0B0, 6L); - expectedReadableOffsets.put(tbP0B1, 3L); - expectedReadableOffsets.put(tbP1B0, 3L); - assertThat(readableSnapshotAndOffsets.getReadableOffsets()) - .isEqualTo(expectedReadableOffsets); - // all buckets L0 level has been flushed in snapshot3, we can delete all snapshots prior to - // snapshot3 safely since we won't need to search for any earlier snapshots to get readable - // offsets - assertThat(readableSnapshotAndOffsets.getEarliestSnapshotIdToKeep()).isEqualTo(3); + assertThat(readableSnapshotAndOffsets).isNull(); commitSnapshot( tableId, tablePath, @@ -669,6 +672,69 @@ class DvTableReadableSnapshotRetrieverTest { expectedReadableOffsets.put(tbP1B0, 6L); assertThat(readableSnapshotAndOffsets.getReadableOffsets()) .isEqualTo(expectedReadableOffsets); + commitSnapshot( + tableId, + tablePath, + snapshot11, + tieredLakeSnapshotEndOffset, + readableSnapshotAndOffsets); + + // Step 9: APPEND snapshot 12 - write more data to partition0, bucket1 + // Snapshot 12 state: + // ┌─────────────┬─────────┬─────────────────────────────┐ + // │ Partition │ Bucket │ Files │ + // ├─────────────┼─────────┼─────────────────────────────┤ + // │ partition0 │ bucket0 │ L1: [from s7 (rows 0-5)] │ ← unchanged + // │ partition0 │ bucket1 │ L1: [from s7 (rows 0-2)] │ + // │ │ │ L0: [rows 3-6] ← new │ ← added in snapshot12 + // │ partition1 │ bucket0 │ L1: [from s10 (rows 0-5)] │ ← unchanged + // └─────────────┴─────────┴─────────────────────────────┘ + Map<Integer, List<GenericRow>> appendRowsP0B1 = + Collections.singletonMap( + bucket1, generateRowsForPartition(partition0, bucket1, 3, 7)); + long snapshot12 = writeAndCommitData(fileStoreTable, appendRowsP0B1); + tieredLakeSnapshotEndOffset.put(tbP0B1, 6L); + commitSnapshot( + tableId, + tablePath, + snapshot12, + tieredLakeSnapshotEndOffset, + retriveReadableSnapshotAndOffsets(tablePath, fileStoreTable, snapshot12)); + + // Step 10: COMPACT snapshot 13 - compact partition0, bucket1 again (flushes snapshot12's + // L0) + // Snapshot 13 state (after compacting partition0, bucket1): + // ┌─────────────┬─────────┬─────────────────────────────┐ + // │ Partition │ Bucket │ Files │ + // ├─────────────┼─────────┼─────────────────────────────┤ + // │ partition0 │ bucket0 │ L1: [from s7 (rows 0-5)] │ ← unchanged + // │ partition0 │ bucket1 │ L1: [merged s7 L1 + s12 L0] │ ← s12's L0 flushed to L1 + // │ │ │ [rows 0-6 total] │ + // │ partition1 │ bucket0 │ L1: [from s10 (rows 0-5)] │ ← unchanged + // └─────────────┴─────────┴─────────────────────────────┘ + compactHelper.compactBucket(partition0BinaryRow, bucket1).commit(); + long snapshot13 = latestSnapshot(fileStoreTable); + + // Create an empty tiered snapshot (snapshot14) to simulate tiered snapshot commit + long snapshot14 = writeAndCommitData(fileStoreTable, Collections.emptyMap()); + + readableSnapshotAndOffsets = + retriveReadableSnapshotAndOffsets(tablePath, fileStoreTable, snapshot14); + // readable_snapshot = snapshot13 + // readable_offsets: partition0/bucket0 uses snapshot6's offset (6L), + // partition0/bucket1 uses snapshot12's offset (6L) - flushed in s13, + // partition1/bucket0 uses snapshot9's offset (6L) + assertThat(readableSnapshotAndOffsets.getReadableSnapshotId()).isEqualTo(snapshot13); + expectedReadableOffsets = new HashMap<>(); + expectedReadableOffsets.put(tbP0B0, 6L); + expectedReadableOffsets.put(tbP0B1, 6L); + expectedReadableOffsets.put(tbP1B0, 6L); + assertThat(readableSnapshotAndOffsets.getReadableOffsets()) + .isEqualTo(expectedReadableOffsets); + + // After compact 13, all buckets have no L0 in the compacted snapshot, we only need + // to keep the previous append snapshot 12 + assertThat(readableSnapshotAndOffsets.getEarliestSnapshotIdToKeep()).isEqualTo(snapshot12); } private long latestSnapshot(FileStoreTable fileStoreTable) {
