wuchong commented on code in PR #2326:
URL: https://github.com/apache/fluss/pull/2326#discussion_r2782733136


##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CommitLakeTableSnapshotEvent.java:
##########
@@ -18,26 +18,26 @@
 package org.apache.fluss.server.coordinator.event;
 
 import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
-import org.apache.fluss.server.entity.CommitLakeTableSnapshotData;
+import org.apache.fluss.server.entity.CommitLakeTableSnapshotsData;
 
 import java.util.concurrent.CompletableFuture;
 
 /** An event for receiving the request of commit lakehouse data to coordinator 
server. */
 public class CommitLakeTableSnapshotEvent implements CoordinatorEvent {
 
-    private final CommitLakeTableSnapshotData commitLakeTableSnapshotData;
+    private final CommitLakeTableSnapshotsData commitLakeTableSnapshotsData;
 
     private final CompletableFuture<CommitLakeTableSnapshotResponse> 
respCallback;
 
     public CommitLakeTableSnapshotEvent(
-            CommitLakeTableSnapshotData commitLakeTableSnapshotData,
+            CommitLakeTableSnapshotsData commitLakeTableSnapshotsData,
             CompletableFuture<CommitLakeTableSnapshotResponse> respCallback) {
-        this.commitLakeTableSnapshotData = commitLakeTableSnapshotData;
+        this.commitLakeTableSnapshotsData = commitLakeTableSnapshotsData;
         this.respCallback = respCallback;
     }
 
-    public CommitLakeTableSnapshotData getCommitLakeTableSnapshotData() {
-        return commitLakeTableSnapshotData;
+    public CommitLakeTableSnapshotsData getCommitLakeTableSnapshotData() {

Review Comment:
   nit: rename the method name to `getCommitLakeTableSnapshotsData`. 



##########
fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java:
##########
@@ -451,31 +444,48 @@ public CompletableFuture<ListPartitionInfosResponse> 
listPartitionInfos(
     }
 
     @Override
-    public CompletableFuture<GetLatestLakeSnapshotResponse> 
getLatestLakeSnapshot(
-            GetLatestLakeSnapshotRequest request) {
+    public CompletableFuture<GetLakeSnapshotResponse> getLakeSnapshot(
+            GetLakeSnapshotRequest request) {
+        // get table info
         TablePath tablePath = toTablePath(request.getTablePath());
         authorizeTable(OperationType.DESCRIBE, tablePath);
 
         // get table info
         TableInfo tableInfo = metadataManager.getTable(tablePath);
         // get table id
         long tableId = tableInfo.getTableId();
-        CompletableFuture<GetLatestLakeSnapshotResponse> resultFuture = new 
CompletableFuture<>();
+        CompletableFuture<GetLakeSnapshotResponse> resultFuture = new 
CompletableFuture<>();
+        boolean readableSnapshot = request.hasReadable() && 
request.isReadable();
         ioExecutor.execute(
                 () -> {
-                    Optional<LakeTableSnapshot> optLakeTableSnapshot;
                     try {
-                        optLakeTableSnapshot = 
zkClient.getLakeTableSnapshot(tableId);
-                        if (!optLakeTableSnapshot.isPresent()) {
-                            resultFuture.completeExceptionally(
-                                    new LakeTableSnapshotNotExistException(
-                                            String.format(
-                                                    "Lake table snapshot not 
exist for table: %s, table id: %d",
-                                                    tablePath, tableId)));
-                        } else {
-                            LakeTableSnapshot lakeTableSnapshot = 
optLakeTableSnapshot.get();
+                        Optional<LakeTableSnapshot> optSnapshot =
+                                readableSnapshot
+                                        ? 
zkClient.getLatestReadableLakeTableSnapshot(tableId)
+                                        : zkClient.getLakeTableSnapshot(
+                                                tableId,
+                                                request.hasSnapshotId()
+                                                        ? 
request.getSnapshotId()
+                                                        : null);

Review Comment:
   What is the behavior when both snapshot_id and readable_snapshot are set? I 
think we don't support that and should throw exceptions? 



##########
fluss-common/src/main/java/org/apache/fluss/lake/committer/LakeCommitResult.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.committer;
+
+import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.metadata.TableBucket;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The result of a lake commit operation, containing the committed snapshot ID 
and the readable
+ * snapshot information.
+ *
+ * <p>For most implementations, the readable snapshot is the same as the 
committed snapshot, and the
+ * readable log offsets are the same as the tiered offsets from 
TieringCommitOperator.
+ *
+ * <p>For Paimon DV tables, the readable snapshot will be different from the 
committed snapshot, and
+ * the log end offsets will be different as well (based on compaction status).
+ *
+ * @since 0.9
+ */
+@PublicEvolving
+public class LakeCommitResult {
+
+    // -1 to enforce to keep all previous snapshots
+    public static final Long KEEP_ALL_PREVIOUS = -1L;
+
+    // The snapshot ID that was just committed
+    private final long committedSnapshotId;
+
+    private final boolean committedIsReadable;
+
+    // The earliest snapshot ID to keep, null means not to keep any previous 
snapshot
+    @Nullable private final Long earliestSnapshotIDToKeep;
+
+    // the readable snapshot, null if
+    // 1: the readable snapshot is unknown,
+    // 2: committedIsReadable is true, committed snapshot is just also readable
+    @Nullable private final ReadableSnapshot readableSnapshot;
+
+    private LakeCommitResult(
+            long committedSnapshotId,
+            boolean committedIsReadable,
+            @Nullable ReadableSnapshot readableSnapshot,
+            @Nullable Long earliestSnapshotIDToKeep) {
+        this.committedSnapshotId = committedSnapshotId;
+        this.committedIsReadable = committedIsReadable;
+        this.readableSnapshot = readableSnapshot;
+        this.earliestSnapshotIDToKeep = earliestSnapshotIDToKeep;
+    }
+
+    public static LakeCommitResult committedIsReadable(long 
committedSnapshotId) {
+        return new LakeCommitResult(committedSnapshotId, true, null, null);

Review Comment:
   Can we introduce a `public static final Long KEEP_LATEST = null;` and use 
`KEEP_LATEST` rather than `null` here? Maybe also other places.



##########
fluss-common/src/main/java/org/apache/fluss/lake/committer/CommitterInitContext.java:
##########
@@ -51,4 +51,11 @@ public interface CommitterInitContext {
      * @return the lake tiering config
      */
     Configuration lakeTieringConfig();
+
+    /**
+     * Returns the fluss config.
+     *
+     * @return the fluss config
+     */
+    Configuration flussConfig();

Review Comment:
   Nit: consider renaming this to `flussClientConfig()`.  
   
   The current name is a bit confusing alongside `lakeTieringConfig()`, since 
both are instances of Fluss’s `Configuration` class—just serving different 
purposes. Renaming it to `flussClientConfig()` and adding a clarifying comment 
(e.g., “This configuration can be used to build a Fluss client, such as a 
`Connection`”) would make the intent much clearer for users.



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.paimon.utils;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.metadata.LakeSnapshot;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.committer.LakeCommitResult;
+import org.apache.fluss.metadata.PartitionInfo;
+import org.apache.fluss.metadata.ResolvedPartitionSpec;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.utils.types.Tuple2;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.utils.SnapshotManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.fluss.lake.paimon.utils.PaimonDvTableUtils.findLatestSnapshotExactlyHoldingL0Files;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * A retriever to retrieve the readable snapshot and offsets for Paimon 
deletion vector enabled
+ * table.
+ */
+public class DvTableReadableSnapshotRetriever implements AutoCloseable {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DvTableReadableSnapshotRetriever.class);
+
+    private final TablePath tablePath;
+    private final long tableId;
+    private final FileStoreTable fileStoreTable;
+    private final Admin flussAdmin;
+    private final Connection flussConnection;
+    private final SnapshotManager snapshotManager;
+
+    public DvTableReadableSnapshotRetriever(
+            TablePath tablePath,
+            long tableId,
+            FileStoreTable paimonFileStoreTable,
+            Configuration flussConfig) {
+        this.tablePath = tablePath;
+        this.tableId = tableId;
+        this.fileStoreTable = paimonFileStoreTable;
+        this.flussConnection = ConnectionFactory.createConnection(flussConfig);
+        this.flussAdmin = flussConnection.getAdmin();
+        this.snapshotManager = fileStoreTable.snapshotManager();
+    }
+
+    /**
+     * Get readable offsets for DV tables based on the latest compacted 
snapshot.
+     *
+     * <p>For Paimon DV tables, when an appended snapshot is committed, we 
need to check the latest
+     * compacted snapshot to determine readable offsets for each bucket. This 
method implements
+     * incremental advancement of readable_snapshot per bucket:
+     *
+     * <ul>
+     *   <li>For buckets without L0 files: use offsets from the latest tiered 
snapshot. These
+     *       buckets can advance their readable offsets since all their data 
is in base files (L1+).
+     *   <li>For buckets with L0 files: traverse backwards through compacted 
snapshots to find the
+     *       latest one that flushed this bucket's L0 files. Then find the 
latest snapshot that
+     *       exactly holds those flushed L0 files, and use the previous APPEND 
snapshot's offset for
+     *       that bucket.
+     * </ul>
+     *
+     * <p>Algorithm:
+     *
+     * <ol>
+     *   <li>Find the latest compacted snapshot before the given tiered 
snapshot
+     *   <li>Check which buckets have no L0 files and which have L0 files in 
the compacted snapshot
+     *   <li>For buckets without L0 files: use offsets from the latest tiered 
snapshot (all data is
+     *       in base files, safe to advance)
+     *   <li>For buckets with L0 files:
+     *       <ol>
+     *         <li>Traverse backwards through compacted snapshots starting 
from the latest one
+     *         <li>For each compacted snapshot, check which buckets had their 
L0 files flushed
+     *         <li>For each flushed bucket, find the latest snapshot that 
exactly holds those L0
+     *             files using {@link 
PaimonDvTableUtils#findLatestSnapshotExactlyHoldingL0Files}
+     *         <li>Find the previous APPEND snapshot before that snapshot
+     *         <li>Use that APPEND snapshot's offset for the bucket
+     *       </ol>
+     *   <li>Return readable offsets for all buckets, allowing incremental 
advancement
+     * </ol>
+     *
+     * <p>Note: This allows readable_snapshot to advance incrementally per 
bucket. Each bucket's
+     * readable offset is set to the maximum offset that is actually readable 
in the compacted
+     * snapshot, ensuring no data duplication or loss. The readable_snapshot 
is set to the latest
+     * compacted snapshot ID, and each bucket continues reading from its 
respective readable offset.
+     *
+     * <p>Example: If bucket0's L0 files were flushed in snapshot5 (which 
compacted snapshot1's L0
+     * files), and snapshot4 is the latest snapshot that exactly holds those 
L0 files, then
+     * bucket0's readable offset will be set to snapshot4's previous APPEND 
snapshot's offset.
+     *
+     * @param tieredSnapshotId the tiered snapshot ID (the appended snapshot 
that was just
+     *     committed)
+     * @return a tuple containing the readable snapshot ID (the latest 
compacted snapshot) and a map
+     *     of TableBucket to readable offset for all buckets, or null if:
+     *     <ul>
+     *       <li>No compacted snapshot exists before the tiered snapshot
+     *       <li>Cannot find the latest snapshot holding flushed L0 files for 
some buckets
+     *       <li>Cannot find the previous APPEND snapshot for some buckets
+     *       <li>Cannot find offsets in Fluss for some buckets
+     *     </ul>
+     *     The map contains offsets for ALL buckets, allowing incremental 
advancement.
+     * @throws IOException if an error occurs reading snapshots or offsets 
from Fluss
+     */
+    @Nullable
+    public ReadableSnapshotResult getReadableSnapshotAndOffsets(long 
tieredSnapshotId)
+            throws IOException {
+        // Find the latest compacted snapshot
+        Snapshot latestCompactedSnapshot =
+                findPreviousSnapshot(tieredSnapshotId, 
Snapshot.CommitKind.COMPACT);
+        if (latestCompactedSnapshot == null) {
+            // No compacted snapshot found, may happen when no compaction 
happens or snapshot
+            // expiration, we can't update readable offsets, return null 
directly
+            LOG.info(
+                    "Can't find latest compacted snapshot before snapshot {}, 
skip get readable snapshot.",
+                    tieredSnapshotId);
+            return null;
+        }
+

Review Comment:
   Could you add a TODO comment here to optimize performance? Specifically, we 
can first look up ZooKeeper to check whether `latestCompactedSnapshot.id` has 
already been registered in the ZK lake node. If it exists, there’s no need to 
recompute the tiered offsets and readable offsets for this snapshot ID.
   
   In practice, it’s common to have around 10 `APPEND` snapshots following a 
single `COMPACT` snapshot, so skipping redundant recomputation would 
significantly improve the efficiency of the tiering service.



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.paimon.utils;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.metadata.LakeSnapshot;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.committer.LakeCommitResult;
+import org.apache.fluss.metadata.PartitionInfo;
+import org.apache.fluss.metadata.ResolvedPartitionSpec;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.utils.types.Tuple2;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.utils.SnapshotManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.fluss.lake.paimon.utils.PaimonDvTableUtils.findLatestSnapshotExactlyHoldingL0Files;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * A retriever to retrieve the readable snapshot and offsets for Paimon 
deletion vector enabled
+ * table.
+ */
+public class DvTableReadableSnapshotRetriever implements AutoCloseable {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DvTableReadableSnapshotRetriever.class);
+
+    private final TablePath tablePath;
+    private final long tableId;
+    private final FileStoreTable fileStoreTable;
+    private final Admin flussAdmin;
+    private final Connection flussConnection;
+    private final SnapshotManager snapshotManager;
+
+    public DvTableReadableSnapshotRetriever(
+            TablePath tablePath,
+            long tableId,
+            FileStoreTable paimonFileStoreTable,
+            Configuration flussConfig) {
+        this.tablePath = tablePath;
+        this.tableId = tableId;
+        this.fileStoreTable = paimonFileStoreTable;
+        this.flussConnection = ConnectionFactory.createConnection(flussConfig);
+        this.flussAdmin = flussConnection.getAdmin();
+        this.snapshotManager = fileStoreTable.snapshotManager();
+    }
+
+    /**
+     * Get readable offsets for DV tables based on the latest compacted 
snapshot.
+     *
+     * <p>For Paimon DV tables, when an appended snapshot is committed, we 
need to check the latest
+     * compacted snapshot to determine readable offsets for each bucket. This 
method implements
+     * incremental advancement of readable_snapshot per bucket:
+     *
+     * <ul>
+     *   <li>For buckets without L0 files: use offsets from the latest tiered 
snapshot. These
+     *       buckets can advance their readable offsets since all their data 
is in base files (L1+).
+     *   <li>For buckets with L0 files: traverse backwards through compacted 
snapshots to find the
+     *       latest one that flushed this bucket's L0 files. Then find the 
latest snapshot that
+     *       exactly holds those flushed L0 files, and use the previous APPEND 
snapshot's offset for
+     *       that bucket.
+     * </ul>
+     *
+     * <p>Algorithm:
+     *
+     * <ol>
+     *   <li>Find the latest compacted snapshot before the given tiered 
snapshot
+     *   <li>Check which buckets have no L0 files and which have L0 files in 
the compacted snapshot
+     *   <li>For buckets without L0 files: use offsets from the latest tiered 
snapshot (all data is
+     *       in base files, safe to advance)
+     *   <li>For buckets with L0 files:
+     *       <ol>
+     *         <li>Traverse backwards through compacted snapshots starting 
from the latest one
+     *         <li>For each compacted snapshot, check which buckets had their 
L0 files flushed
+     *         <li>For each flushed bucket, find the latest snapshot that 
exactly holds those L0
+     *             files using {@link 
PaimonDvTableUtils#findLatestSnapshotExactlyHoldingL0Files}
+     *         <li>Find the previous APPEND snapshot before that snapshot
+     *         <li>Use that APPEND snapshot's offset for the bucket
+     *       </ol>
+     *   <li>Return readable offsets for all buckets, allowing incremental 
advancement
+     * </ol>
+     *
+     * <p>Note: This allows readable_snapshot to advance incrementally per 
bucket. Each bucket's
+     * readable offset is set to the maximum offset that is actually readable 
in the compacted
+     * snapshot, ensuring no data duplication or loss. The readable_snapshot 
is set to the latest
+     * compacted snapshot ID, and each bucket continues reading from its 
respective readable offset.
+     *
+     * <p>Example: If bucket0's L0 files were flushed in snapshot5 (which 
compacted snapshot1's L0
+     * files), and snapshot4 is the latest snapshot that exactly holds those 
L0 files, then
+     * bucket0's readable offset will be set to snapshot4's previous APPEND 
snapshot's offset.
+     *
+     * @param tieredSnapshotId the tiered snapshot ID (the appended snapshot 
that was just
+     *     committed)
+     * @return a tuple containing the readable snapshot ID (the latest 
compacted snapshot) and a map
+     *     of TableBucket to readable offset for all buckets, or null if:
+     *     <ul>
+     *       <li>No compacted snapshot exists before the tiered snapshot
+     *       <li>Cannot find the latest snapshot holding flushed L0 files for 
some buckets
+     *       <li>Cannot find the previous APPEND snapshot for some buckets
+     *       <li>Cannot find offsets in Fluss for some buckets
+     *     </ul>
+     *     The map contains offsets for ALL buckets, allowing incremental 
advancement.
+     * @throws IOException if an error occurs reading snapshots or offsets 
from Fluss
+     */
+    @Nullable
+    public ReadableSnapshotResult getReadableSnapshotAndOffsets(long 
tieredSnapshotId)
+            throws IOException {
+        // Find the latest compacted snapshot
+        Snapshot latestCompactedSnapshot =
+                findPreviousSnapshot(tieredSnapshotId, 
Snapshot.CommitKind.COMPACT);
+        if (latestCompactedSnapshot == null) {
+            // No compacted snapshot found, may happen when no compaction 
happens or snapshot
+            // expiration, we can't update readable offsets, return null 
directly
+            LOG.info(
+                    "Can't find latest compacted snapshot before snapshot {}, 
skip get readable snapshot.",
+                    tieredSnapshotId);
+            return null;
+        }
+
+        Map<TableBucket, Long> readableOffsets = new HashMap<>();
+
+        FlussTableBucketMapper flussTableBucketMapper = new 
FlussTableBucketMapper();
+
+        // get all the bucket without l0 files and with l0 files
+        Tuple2<Set<PartitionBucket>, Set<PartitionBucket>> 
bucketsWithoutL0AndWithL0 =
+                getBucketsWithAndWithoutL0AndWithL0(latestCompactedSnapshot);
+        Set<PartitionBucket> bucketsWithoutL0 = bucketsWithoutL0AndWithL0.f0;
+        Set<PartitionBucket> bucketsWithL0 = bucketsWithoutL0AndWithL0.f1;
+
+        // Track the earliest previousAppendSnapshot ID that was accessed
+        // This represents the oldest snapshot that might still be needed
+        long earliestSnapshotIdToKeep = LakeCommitResult.KEEP_ALL_PREVIOUS;
+
+        if (!bucketsWithoutL0.isEmpty()) {
+            // Get latest tiered offsets
+            LakeSnapshot latestTieredSnapshot;
+            try {
+                latestTieredSnapshot = 
flussAdmin.getLatestLakeSnapshot(tablePath).get();
+            } catch (Exception e) {
+                throw new IOException(
+                        "Failed to read lake snapshot from Fluss server for 
snapshot "
+                                + latestCompactedSnapshot.id(),
+                        e);

Review Comment:
   Maybe just logging the exception and return null to avoid abort the snapshot?



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonDvTableUtils.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.paimon.utils;
+
+import org.apache.fluss.utils.types.Tuple2;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.SnapshotManager;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/** Utils for Paimon delete-vector enabled table. */
+public class PaimonDvTableUtils {
+
+    /**
+     * Find the latest snapshot that still holds the L0 files flushed by the 
given compacted
+     * snapshot.
+     *
+     * <p>The method works by:
+     *
+     * <ol>
+     *   <li>Getting the delta of the compacted snapshot to find deleted L0 
files
+     *   <li>Grouping deleted L0 files by bucket
+     *   <li>Searching backwards through previous snapshots to find the latest 
one whose L0 files
+     *       exactly match the deleted L0 files for each bucket
+     * </ol>
+     *
+     * <p>This snapshot is the most recent snapshot that still contains all 
the L0 files that were
+     * flushed (deleted) by the compacted snapshot.
+     *
+     * @param fileStoreTable the FileStoreTable instance
+     * @param compactedSnapshot the compacted snapshot whose flushed L0 files 
to search for
+     * @return the latest snapshot that still holds these L0 files, or null if 
not found
+     * @throws IOException if an error occurs
+     */
+    @Nullable
+    public static Snapshot findLatestSnapshotExactlyHoldingL0Files(
+            FileStoreTable fileStoreTable, Snapshot compactedSnapshot) throws 
IOException {
+        SnapshotManager snapshotManager = fileStoreTable.snapshotManager();
+        checkState(compactedSnapshot.commitKind() == 
Snapshot.CommitKind.COMPACT);
+        // Get deleted L0 files from the compacted snapshot's delta
+        Map<Tuple2<BinaryRow, Integer>, Set<String>> deletedL0FilesByBucket =
+                getDeletedL0FilesByBucket(fileStoreTable, compactedSnapshot);
+
+        if (deletedL0FilesByBucket.isEmpty()) {
+            // No L0 files were deleted, can't find a snapshot holding these 
L0 files,
+            // return null directly
+            return null;
+        }
+
+        // Search backwards from the compacted snapshot to find the latest 
snapshot
+        // that still holds these L0 files
+        long earliestSnapshot = 
checkNotNull(snapshotManager.earliestSnapshotId());
+        for (long snapshot = compactedSnapshot.id() - 1; snapshot >= 
earliestSnapshot; snapshot--) {
+            Snapshot candidateSnapshot = 
snapshotManager.tryGetSnapshot(snapshot);
+            if (candidateSnapshot == null) {
+                // no such snapshot in paimon, skip
+                continue;
+            }
+            if (matchesDeletedL0Files(fileStoreTable, candidateSnapshot, 
deletedL0FilesByBucket)) {
+                return candidateSnapshot;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Get deleted L0 files grouped by bucket from a compacted snapshot's 
delta.
+     *
+     * @param compactedSnapshot the compacted snapshot
+     * @return a map from bucket ID to set of deleted L0 file names
+     */
+    private static Map<Tuple2<BinaryRow, Integer>, Set<String>> 
getDeletedL0FilesByBucket(

Review Comment:
   ditto. use `PaimonPartitionBucket`



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.paimon.utils;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.metadata.LakeSnapshot;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.committer.LakeCommitResult;
+import org.apache.fluss.metadata.PartitionInfo;
+import org.apache.fluss.metadata.ResolvedPartitionSpec;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.utils.types.Tuple2;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.utils.SnapshotManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.fluss.lake.paimon.utils.PaimonDvTableUtils.findLatestSnapshotExactlyHoldingL0Files;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * A retriever to retrieve the readable snapshot and offsets for Paimon 
deletion vector enabled
+ * table.
+ */
+public class DvTableReadableSnapshotRetriever implements AutoCloseable {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DvTableReadableSnapshotRetriever.class);
+
+    private final TablePath tablePath;
+    private final long tableId;
+    private final FileStoreTable fileStoreTable;
+    private final Admin flussAdmin;
+    private final Connection flussConnection;
+    private final SnapshotManager snapshotManager;
+
+    public DvTableReadableSnapshotRetriever(
+            TablePath tablePath,
+            long tableId,
+            FileStoreTable paimonFileStoreTable,
+            Configuration flussConfig) {
+        this.tablePath = tablePath;
+        this.tableId = tableId;
+        this.fileStoreTable = paimonFileStoreTable;
+        this.flussConnection = ConnectionFactory.createConnection(flussConfig);
+        this.flussAdmin = flussConnection.getAdmin();
+        this.snapshotManager = fileStoreTable.snapshotManager();
+    }
+
+    /**
+     * Get readable offsets for DV tables based on the latest compacted 
snapshot.
+     *
+     * <p>For Paimon DV tables, when an appended snapshot is committed, we 
need to check the latest
+     * compacted snapshot to determine readable offsets for each bucket. This 
method implements
+     * incremental advancement of readable_snapshot per bucket:
+     *
+     * <ul>
+     *   <li>For buckets without L0 files: use offsets from the latest tiered 
snapshot. These
+     *       buckets can advance their readable offsets since all their data 
is in base files (L1+).
+     *   <li>For buckets with L0 files: traverse backwards through compacted 
snapshots to find the
+     *       latest one that flushed this bucket's L0 files. Then find the 
latest snapshot that
+     *       exactly holds those flushed L0 files, and use the previous APPEND 
snapshot's offset for
+     *       that bucket.
+     * </ul>
+     *
+     * <p>Algorithm:
+     *
+     * <ol>
+     *   <li>Find the latest compacted snapshot before the given tiered 
snapshot
+     *   <li>Check which buckets have no L0 files and which have L0 files in 
the compacted snapshot
+     *   <li>For buckets without L0 files: use offsets from the latest tiered 
snapshot (all data is
+     *       in base files, safe to advance)
+     *   <li>For buckets with L0 files:
+     *       <ol>
+     *         <li>Traverse backwards through compacted snapshots starting 
from the latest one
+     *         <li>For each compacted snapshot, check which buckets had their 
L0 files flushed
+     *         <li>For each flushed bucket, find the latest snapshot that 
exactly holds those L0
+     *             files using {@link 
PaimonDvTableUtils#findLatestSnapshotExactlyHoldingL0Files}
+     *         <li>Find the previous APPEND snapshot before that snapshot
+     *         <li>Use that APPEND snapshot's offset for the bucket
+     *       </ol>
+     *   <li>Return readable offsets for all buckets, allowing incremental 
advancement
+     * </ol>
+     *
+     * <p>Note: This allows readable_snapshot to advance incrementally per 
bucket. Each bucket's
+     * readable offset is set to the maximum offset that is actually readable 
in the compacted
+     * snapshot, ensuring no data duplication or loss. The readable_snapshot 
is set to the latest
+     * compacted snapshot ID, and each bucket continues reading from its 
respective readable offset.
+     *
+     * <p>Example: If bucket0's L0 files were flushed in snapshot5 (which 
compacted snapshot1's L0
+     * files), and snapshot4 is the latest snapshot that exactly holds those 
L0 files, then
+     * bucket0's readable offset will be set to snapshot4's previous APPEND 
snapshot's offset.
+     *
+     * @param tieredSnapshotId the tiered snapshot ID (the appended snapshot 
that was just
+     *     committed)
+     * @return a tuple containing the readable snapshot ID (the latest 
compacted snapshot) and a map
+     *     of TableBucket to readable offset for all buckets, or null if:
+     *     <ul>
+     *       <li>No compacted snapshot exists before the tiered snapshot
+     *       <li>Cannot find the latest snapshot holding flushed L0 files for 
some buckets
+     *       <li>Cannot find the previous APPEND snapshot for some buckets
+     *       <li>Cannot find offsets in Fluss for some buckets
+     *     </ul>
+     *     The map contains offsets for ALL buckets, allowing incremental 
advancement.
+     * @throws IOException if an error occurs reading snapshots or offsets 
from Fluss
+     */
+    @Nullable
+    public ReadableSnapshotResult getReadableSnapshotAndOffsets(long 
tieredSnapshotId)
+            throws IOException {
+        // Find the latest compacted snapshot
+        Snapshot latestCompactedSnapshot =
+                findPreviousSnapshot(tieredSnapshotId, 
Snapshot.CommitKind.COMPACT);
+        if (latestCompactedSnapshot == null) {
+            // No compacted snapshot found, may happen when no compaction 
happens or snapshot
+            // expiration, we can't update readable offsets, return null 
directly
+            LOG.info(
+                    "Can't find latest compacted snapshot before snapshot {}, 
skip get readable snapshot.",
+                    tieredSnapshotId);
+            return null;
+        }
+
+        Map<TableBucket, Long> readableOffsets = new HashMap<>();
+
+        FlussTableBucketMapper flussTableBucketMapper = new 
FlussTableBucketMapper();
+
+        // get all the bucket without l0 files and with l0 files
+        Tuple2<Set<PartitionBucket>, Set<PartitionBucket>> 
bucketsWithoutL0AndWithL0 =
+                getBucketsWithAndWithoutL0AndWithL0(latestCompactedSnapshot);
+        Set<PartitionBucket> bucketsWithoutL0 = bucketsWithoutL0AndWithL0.f0;
+        Set<PartitionBucket> bucketsWithL0 = bucketsWithoutL0AndWithL0.f1;
+
+        // Track the earliest previousAppendSnapshot ID that was accessed
+        // This represents the oldest snapshot that might still be needed
+        long earliestSnapshotIdToKeep = LakeCommitResult.KEEP_ALL_PREVIOUS;
+
+        if (!bucketsWithoutL0.isEmpty()) {
+            // Get latest tiered offsets
+            LakeSnapshot latestTieredSnapshot;
+            try {
+                latestTieredSnapshot = 
flussAdmin.getLatestLakeSnapshot(tablePath).get();
+            } catch (Exception e) {
+                throw new IOException(
+                        "Failed to read lake snapshot from Fluss server for 
snapshot "
+                                + latestCompactedSnapshot.id(),
+                        e);
+            }
+            // for all buckets without l0, we can use the latest tiered offsets
+            for (PartitionBucket bucket : bucketsWithoutL0) {
+                TableBucket tableBucket = 
flussTableBucketMapper.toTableBucket(bucket);
+                if (tableBucket == null) {
+                    // can't map such paimon bucket to fluss, just ignore
+                    continue;
+                }
+                readableOffsets.put(
+                        tableBucket, 
latestTieredSnapshot.getTableBucketsOffset().get(tableBucket));
+            }
+        }
+
+        // for all buckets with l0, we need to find the latest compacted 
snapshot which flushed
+        // the buckets, the per-bucket offset should be updated to the 
corresponding compacted
+        // snapshot offsets
+        Set<PartitionBucket> allBucketsToAdvance = new 
HashSet<>(bucketsWithL0);
+
+        long earliestSnapshotId = 
checkNotNull(snapshotManager.earliestSnapshotId());
+        // From latestCompacted forward traverse compacted snapshots
+        for (long currentSnapshotId = latestCompactedSnapshot.id();
+                currentSnapshotId >= earliestSnapshotId;
+                currentSnapshotId--) {
+            // no any buckets to advance, break directly
+            if (allBucketsToAdvance.isEmpty()) {
+                break;
+            }
+            Snapshot currentSnapshot = 
snapshotManager.tryGetSnapshot(currentSnapshotId);
+            if (currentSnapshot == null
+                    || currentSnapshot.commitKind() != 
Snapshot.CommitKind.COMPACT) {
+                continue;
+            }
+            // Get buckets flushed by current compacted snapshot
+            Set<PartitionBucket> flushedBuckets = 
getBucketsWithFlushedL0(currentSnapshot);
+            // For each flushed bucket, if offset not set yet, set it
+            for (PartitionBucket partitionBucket : flushedBuckets) {
+                TableBucket tb = 
flussTableBucketMapper.toTableBucket(partitionBucket);
+                if (tb == null) {
+                    // can't map such paimon bucket to fluss,just ignore
+                    // don't need to advance offset for the bucket
+                    allBucketsToAdvance.remove(partitionBucket);
+                    continue;
+                }
+                if (!readableOffsets.containsKey(tb)) {
+                    Snapshot sourceSnapshot =
+                            findLatestSnapshotExactlyHoldingL0Files(
+                                    fileStoreTable, currentSnapshot);
+                    // it happens if there is a compacted snapshot flush l0 
files for a bucket,
+                    // but the snapshot from which the compacted snapshot 
compact is expired
+                    // it should happen rarely, we can't determine the 
readable offsets for this
+                    // bucket, currently, we just return null to stop readable 
offset advance
+                    // if it happen, compaction should work unexpected, warn 
it and reminds to
+                    // increase snapshot retention
+                    if (sourceSnapshot == null) {
+                        LOG.warn(
+                                "Cannot find snapshot holding L0 files flushed 
by compacted snapshot {} for bucket {}, "
+                                        + "the snapshot may have been expired. 
Consider increasing snapshot retention.",
+                                currentSnapshot.id(),
+                                tb);
+                        return null;
+                    }
+
+                    // we already find that for this bucket, which snapshot do 
the latest flush,
+                    // the offset for the previous one append snapshot should 
be the readable
+                    // offset
+                    Snapshot previousAppendSnapshot =
+                            sourceSnapshot.commitKind() == 
Snapshot.CommitKind.APPEND
+                                    ? sourceSnapshot
+                                    : findPreviousSnapshot(
+                                            sourceSnapshot.id(), 
Snapshot.CommitKind.APPEND);
+
+                    // Can't find previous APPEND snapshot, likely due to 
snapshot expiration.
+                    // This happens when the snapshot holding flushed L0 files 
is a COMPACT
+                    // snapshot,
+                    // and all APPEND snapshots before it have been expired.
+                    //
+                    // TODO: Optimization - Store compacted snapshot offsets 
in Fluss
+                    // Currently, we rely on Paimon to find the previous 
APPEND snapshot to get its
+                    // offset. If Fluss stores offsets for all snapshots 
(including COMPACT
+                    // snapshots),
+                    // we could:
+                    // 1. Use the sourceSnapshot's offset directly if it's 
stored in Fluss
+                    // 2. Find any previous snapshot (COMPACT or APPEND) and 
use its offset
+                    // 3. This would make the system more resilient to 
snapshot expiration
+                    if (previousAppendSnapshot == null) {
+                        LOG.warn(
+                                "Cannot find previous APPEND snapshot before 
snapshot {} for bucket {}. "
+                                        + "This may be due to snapshot 
expiration. Consider increasing paimon snapshot retention.",
+                                sourceSnapshot.id(),
+                                tb);
+                        return null;
+                    }
+
+                    // Track the minimum previousAppendSnapshot ID
+                    // This snapshot will be accessed via getLakeSnapshot, so 
we need to keep it
+                    if (earliestSnapshotIdToKeep <= 0
+                            || previousAppendSnapshot.id() < 
earliestSnapshotIdToKeep) {
+                        earliestSnapshotIdToKeep = previousAppendSnapshot.id();
+                    }
+
+                    try {
+                        LakeSnapshot lakeSnapshot =
+                                flussAdmin
+                                        .getLakeSnapshot(tablePath, 
previousAppendSnapshot.id())
+                                        .get();
+                        Long offset = 
lakeSnapshot.getTableBucketsOffset().get(tb);
+                        if (offset != null) {
+                            readableOffsets.put(tb, offset);
+                            allBucketsToAdvance.remove(partitionBucket);
+                        } else {
+                            LOG.error(
+                                    "Could not find offset for bucket {} in 
snapshot {}, skip advancing readable snapshot.",
+                                    tb,
+                                    previousAppendSnapshot.id());
+                            return null;
+                        }
+                    } catch (Exception e) {
+                        LOG.error(
+                                "Failed to read lake snapshot {} from Fluss 
server, skip update readable snapshot and offset.",
+                                previousAppendSnapshot.id(),
+                                e);
+                        return null;
+                    }
+                }
+            }
+        }
+
+        // This happens when there are writes to a bucket, but no compaction 
has happened for that
+        // bucket from the earliest snapshot to the latest compacted snapshot.
+        // This should happen rarely in practice, as compaction typically 
processes all buckets over
+        // time.
+        //
+        // TODO: Optimization - Handle buckets without flushed L0 files
+        // We can optimize this case in two ways:
+        // 1. If a previous readable snapshot exists between earliest and 
latest snapshot:
+        //    - Reuse the readable snapshot's offset for this bucket (safe 
since no L0 was flushed)
+        // 2. If the earliest snapshot is the first snapshot committed by 
Fluss:
+        //    - Set the readable offset to 0 for this bucket (no data was 
readable before)
+        // These optimizations would allow readable_snapshot to advance even 
when some buckets
+        // haven't been compacted yet, improving overall system progress.
+        if (!allBucketsToAdvance.isEmpty()) {
+            LOG.warn(
+                    "Could not find flushed snapshots for buckets with L0: {}. 
"
+                            + "These buckets have L0 files but no found 
compaction snapshot has flushed them yet."
+                            + " Consider increasing paimon snapshot 
retention.",
+                    allBucketsToAdvance);
+            return null;
+        }
+
+        // to get the the tiered offset for the readable snapshot,
+        Snapshot previousSnapshot =
+                findPreviousSnapshot(latestCompactedSnapshot.id(), 
Snapshot.CommitKind.APPEND);
+        if (previousSnapshot == null) {
+            LOG.warn(
+                    "Failed to find a previous APPEND snapshot before 
compacted snapshot {} for table {}. "
+                            + "This prevents retrieving baseline offsets from 
Fluss.",
+                    latestCompactedSnapshot.id(),
+                    tablePath);
+            return null;
+        }
+
+        Map<TableBucket, Long> tieredOffsets;
+        try {
+            tieredOffsets =
+                    flussAdmin
+                            .getLakeSnapshot(tablePath, previousSnapshot.id())
+                            .get()
+                            .getTableBucketsOffset();
+        } catch (Exception e) {
+            // we can't get tieredOffsets, just skip it
+            LOG.error(
+                    "Failed to retrieve lake snapshot {} from Fluss server for 
table {}. "
+                            + "The readable snapshot advancement will be 
skipped.",
+                    previousSnapshot.id(),
+                    tablePath,
+                    e);
+            return null;
+        }
+
+        // Return the latest compacted snapshot ID as the unified readable 
snapshot
+        // All buckets can read from this snapshot's base files, then continue 
from their
+        // respective readable offsets
+        // Also return the minimum previousAppendSnapshot ID that was accessed
+        // Snapshots before this ID can potentially be safely deleted from 
Fluss
+        return new ReadableSnapshotResult(
+                latestCompactedSnapshot.id(),
+                tieredOffsets,
+                readableOffsets,
+                earliestSnapshotIdToKeep);
+    }
+
+    /**
+     * Get buckets (with partition info) that have no L0 files in the given 
snapshot.
+     *
+     * <p>For Paimon DV tables, we check the snapshot's data files to 
determine which buckets have
+     * no L0 delta files. A bucket has no L0 files if all its data is in base 
files.
+     *
+     * <p>For partitioned tables, we include partition information in the 
returned TableBucket
+     * objects.
+     *
+     * @param snapshot the snapshot to check
+     * @return set of TableBucket that have no L0 files
+     */
+    private Tuple2<Set<PartitionBucket>, Set<PartitionBucket>> 
getBucketsWithAndWithoutL0AndWithL0(

Review Comment:
   rename to `getBucketsWithoutL0AndWithL0`



##########
fluss-server/src/main/java/org/apache/fluss/server/entity/CommitLakeTableSnapshotsData.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.entity;
+
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
+import org.apache.fluss.server.zk.data.lake.LakeTable;
+import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** The data for request {@link CommitLakeTableSnapshotRequest}. */
+public class CommitLakeTableSnapshotsData {
+
+    private final Map<Long, CommitLakeTableSnapshot> 
commitLakeTableSnapshotByTableId;
+
+    private CommitLakeTableSnapshotsData(
+            Map<Long, CommitLakeTableSnapshot> 
commitLakeTableSnapshotByTableId) {
+        this.commitLakeTableSnapshotByTableId =
+                Collections.unmodifiableMap(commitLakeTableSnapshotByTableId);
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /** Builder for {@link CommitLakeTableSnapshotsData}. */
+    public static class Builder {
+        private final Map<Long, CommitLakeTableSnapshot> snapshotMap = new 
HashMap<>();
+
+        /**
+         * Add a table snapshot entry.
+         *
+         * @param tableId the table ID
+         * @param lakeTableSnapshot the lake table snapshot (for V1 format, 
can be null in V2)
+         * @param tableMaxTieredTimestamps the max tiered timestamps for 
metrics (can be null when
+         *     tiered timestamps is unknown)
+         * @param lakeSnapshotMetadata the lake snapshot metadata (for V2 
format, can be null in v1)
+         * @param earliestSnapshotIDToKeep the earliest snapshot ID to keep 
(can be null in v1 or
+         *     not to keep previous snapshot)
+         */
+        public void addTableSnapshot(
+                long tableId,
+                @Nullable LakeTableSnapshot lakeTableSnapshot,
+                @Nullable Map<TableBucket, Long> tableMaxTieredTimestamps,
+                @Nullable LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata,
+                @Nullable Long earliestSnapshotIDToKeep) {
+            snapshotMap.put(
+                    tableId,
+                    new CommitLakeTableSnapshot(
+                            lakeTableSnapshot,
+                            tableMaxTieredTimestamps != null
+                                    ? tableMaxTieredTimestamps
+                                    : Collections.emptyMap(),
+                            lakeSnapshotMetadata,
+                            earliestSnapshotIDToKeep));
+        }
+
+        /**
+         * Build the {@link CommitLakeTableSnapshotsData} instance.
+         *
+         * @return the built instance
+         */
+        public CommitLakeTableSnapshotsData build() {
+            return new CommitLakeTableSnapshotsData(snapshotMap);
+        }
+    }
+
+    public Map<Long, CommitLakeTableSnapshot> 
getCommitLakeTableSnapshotByTableId() {
+        return commitLakeTableSnapshotByTableId;
+    }
+
+    // Backward compatibility methods
+    public Map<Long, LakeTableSnapshot> getLakeTableSnapshot() {
+        return commitLakeTableSnapshotByTableId.entrySet().stream()
+                .filter(entry -> entry.getValue().lakeTableSnapshot != null)
+                .collect(
+                        Collectors.toMap(
+                                Map.Entry::getKey, entry -> 
entry.getValue().lakeTableSnapshot));
+    }
+
+    public Map<Long, Map<TableBucket, Long>> getTableMaxTieredTimestamps() {
+        return commitLakeTableSnapshotByTableId.entrySet().stream()
+                .filter(
+                        entry ->
+                                entry.getValue().tableMaxTieredTimestamps != 
null
+                                        && 
!entry.getValue().tableMaxTieredTimestamps.isEmpty())
+                .collect(
+                        Collectors.toMap(
+                                Map.Entry::getKey,
+                                entry -> 
entry.getValue().tableMaxTieredTimestamps));
+    }
+
+    public Map<Long, LakeTable.LakeSnapshotMetadata> 
getLakeTableSnapshotMetadatas() {
+        return commitLakeTableSnapshotByTableId.entrySet().stream()
+                .filter(entry -> entry.getValue().lakeSnapshotMetadata != null)
+                .collect(
+                        Collectors.toMap(
+                                Map.Entry::getKey, entry -> 
entry.getValue().lakeSnapshotMetadata));
+    }
+
+    /**
+     * Data container for a single table's lake snapshot commit.
+     *
+     * <p>This class bridges legacy V1 reporting (used for metrics) and the V2 
snapshot metadata
+     * persistence required for the tiering service.
+     */
+    public static class CommitLakeTableSnapshot {
+
+        /**
+         * Since 0.9, this field is only used to allow the coordinator to send 
requests to tablet
+         * servers, enabling tablet servers to report metrics about 
synchronized log end offsets. In
+         * the future, we plan to have the tiering service directly report 
metrics, and this field
+         * will be removed.
+         */
+        @Nullable private final LakeTableSnapshot lakeTableSnapshot;
+
+        /**
+         * Since 0.9, this field is only used to allow the coordinator to send 
requests to tablet
+         * servers, enabling tablet servers to report metrics about max tiered 
timestamps. In the
+         * future, we plan to have the tiering service directly report 
metrics, and this field will
+         * be removed.
+         */
+        @Nullable private final Map<TableBucket, Long> 
tableMaxTieredTimestamps;
+
+        // the following field only non-empty since 0.9
+        @Nullable private final LakeTable.LakeSnapshotMetadata 
lakeSnapshotMetadata;
+
+        // The earliest snapshot ID to keep for DV tables. Null for non-DV 
tables.

Review Comment:
   ```suggestion
           // The earliest snapshot ID to keep for Paimon DV tables. Null for 
non-Paimon-DV tables.
   ```



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.paimon.utils;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.metadata.LakeSnapshot;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.committer.LakeCommitResult;
+import org.apache.fluss.metadata.PartitionInfo;
+import org.apache.fluss.metadata.ResolvedPartitionSpec;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.utils.types.Tuple2;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.utils.SnapshotManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.fluss.lake.paimon.utils.PaimonDvTableUtils.findLatestSnapshotExactlyHoldingL0Files;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * A retriever to retrieve the readable snapshot and offsets for Paimon 
deletion vector enabled
+ * table.
+ */
+public class DvTableReadableSnapshotRetriever implements AutoCloseable {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DvTableReadableSnapshotRetriever.class);
+
+    private final TablePath tablePath;
+    private final long tableId;
+    private final FileStoreTable fileStoreTable;
+    private final Admin flussAdmin;
+    private final Connection flussConnection;
+    private final SnapshotManager snapshotManager;
+
+    public DvTableReadableSnapshotRetriever(
+            TablePath tablePath,
+            long tableId,
+            FileStoreTable paimonFileStoreTable,
+            Configuration flussConfig) {
+        this.tablePath = tablePath;
+        this.tableId = tableId;
+        this.fileStoreTable = paimonFileStoreTable;
+        this.flussConnection = ConnectionFactory.createConnection(flussConfig);
+        this.flussAdmin = flussConnection.getAdmin();
+        this.snapshotManager = fileStoreTable.snapshotManager();
+    }
+
+    /**
+     * Get readable offsets for DV tables based on the latest compacted 
snapshot.
+     *
+     * <p>For Paimon DV tables, when an appended snapshot is committed, we 
need to check the latest
+     * compacted snapshot to determine readable offsets for each bucket. This 
method implements
+     * incremental advancement of readable_snapshot per bucket:
+     *
+     * <ul>
+     *   <li>For buckets without L0 files: use offsets from the latest tiered 
snapshot. These
+     *       buckets can advance their readable offsets since all their data 
is in base files (L1+).
+     *   <li>For buckets with L0 files: traverse backwards through compacted 
snapshots to find the
+     *       latest one that flushed this bucket's L0 files. Then find the 
latest snapshot that
+     *       exactly holds those flushed L0 files, and use the previous APPEND 
snapshot's offset for
+     *       that bucket.
+     * </ul>
+     *
+     * <p>Algorithm:
+     *
+     * <ol>
+     *   <li>Find the latest compacted snapshot before the given tiered 
snapshot
+     *   <li>Check which buckets have no L0 files and which have L0 files in 
the compacted snapshot
+     *   <li>For buckets without L0 files: use offsets from the latest tiered 
snapshot (all data is
+     *       in base files, safe to advance)
+     *   <li>For buckets with L0 files:
+     *       <ol>
+     *         <li>Traverse backwards through compacted snapshots starting 
from the latest one
+     *         <li>For each compacted snapshot, check which buckets had their 
L0 files flushed
+     *         <li>For each flushed bucket, find the latest snapshot that 
exactly holds those L0
+     *             files using {@link 
PaimonDvTableUtils#findLatestSnapshotExactlyHoldingL0Files}
+     *         <li>Find the previous APPEND snapshot before that snapshot
+     *         <li>Use that APPEND snapshot's offset for the bucket
+     *       </ol>
+     *   <li>Return readable offsets for all buckets, allowing incremental 
advancement
+     * </ol>
+     *
+     * <p>Note: This allows readable_snapshot to advance incrementally per 
bucket. Each bucket's
+     * readable offset is set to the maximum offset that is actually readable 
in the compacted
+     * snapshot, ensuring no data duplication or loss. The readable_snapshot 
is set to the latest
+     * compacted snapshot ID, and each bucket continues reading from its 
respective readable offset.
+     *
+     * <p>Example: If bucket0's L0 files were flushed in snapshot5 (which 
compacted snapshot1's L0
+     * files), and snapshot4 is the latest snapshot that exactly holds those 
L0 files, then
+     * bucket0's readable offset will be set to snapshot4's previous APPEND 
snapshot's offset.
+     *
+     * @param tieredSnapshotId the tiered snapshot ID (the appended snapshot 
that was just
+     *     committed)
+     * @return a tuple containing the readable snapshot ID (the latest 
compacted snapshot) and a map
+     *     of TableBucket to readable offset for all buckets, or null if:
+     *     <ul>
+     *       <li>No compacted snapshot exists before the tiered snapshot
+     *       <li>Cannot find the latest snapshot holding flushed L0 files for 
some buckets
+     *       <li>Cannot find the previous APPEND snapshot for some buckets
+     *       <li>Cannot find offsets in Fluss for some buckets
+     *     </ul>
+     *     The map contains offsets for ALL buckets, allowing incremental 
advancement.
+     * @throws IOException if an error occurs reading snapshots or offsets 
from Fluss
+     */
+    @Nullable
+    public ReadableSnapshotResult getReadableSnapshotAndOffsets(long 
tieredSnapshotId)
+            throws IOException {
+        // Find the latest compacted snapshot
+        Snapshot latestCompactedSnapshot =
+                findPreviousSnapshot(tieredSnapshotId, 
Snapshot.CommitKind.COMPACT);
+        if (latestCompactedSnapshot == null) {
+            // No compacted snapshot found, may happen when no compaction 
happens or snapshot
+            // expiration, we can't update readable offsets, return null 
directly
+            LOG.info(
+                    "Can't find latest compacted snapshot before snapshot {}, 
skip get readable snapshot.",
+                    tieredSnapshotId);
+            return null;
+        }
+
+        Map<TableBucket, Long> readableOffsets = new HashMap<>();
+
+        FlussTableBucketMapper flussTableBucketMapper = new 
FlussTableBucketMapper();
+
+        // get all the bucket without l0 files and with l0 files
+        Tuple2<Set<PartitionBucket>, Set<PartitionBucket>> 
bucketsWithoutL0AndWithL0 =
+                getBucketsWithAndWithoutL0AndWithL0(latestCompactedSnapshot);
+        Set<PartitionBucket> bucketsWithoutL0 = bucketsWithoutL0AndWithL0.f0;
+        Set<PartitionBucket> bucketsWithL0 = bucketsWithoutL0AndWithL0.f1;
+
+        // Track the earliest previousAppendSnapshot ID that was accessed
+        // This represents the oldest snapshot that might still be needed
+        long earliestSnapshotIdToKeep = LakeCommitResult.KEEP_ALL_PREVIOUS;
+
+        if (!bucketsWithoutL0.isEmpty()) {
+            // Get latest tiered offsets
+            LakeSnapshot latestTieredSnapshot;
+            try {
+                latestTieredSnapshot = 
flussAdmin.getLatestLakeSnapshot(tablePath).get();

Review Comment:
   nit: should we check the table id consistency? This table may be re-created 
and have a different table id



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java:
##########
@@ -71,28 +76,94 @@ public void registerLakeTableSnapshotV1(long tableId, 
LakeTableSnapshot lakeTabl
 
     public void registerLakeTableSnapshotV2(
             long tableId, LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata) 
throws Exception {
-        Optional<LakeTable> optPreviousLakeTable = 
zkClient.getLakeTable(tableId);
-        List<LakeTable.LakeSnapshotMetadata> previousLakeSnapshotMetadatas = 
null;
-        if (optPreviousLakeTable.isPresent()) {
-            previousLakeSnapshotMetadatas = 
optPreviousLakeTable.get().getLakeSnapshotMetadatas();
-        }
-        LakeTable lakeTable = new LakeTable(lakeSnapshotMetadata);
+        registerLakeTableSnapshotV2(tableId, lakeSnapshotMetadata, null);
+    }
+
+    /**
+     * Register a lake table snapshot and clean up old snapshots based on the 
table type.
+     *
+     * @param tableId the table ID
+     * @param lakeSnapshotMetadata the new snapshot metadata to register
+     * @param earliestSnapshotIDToKeep the earliest snapshot ID to keep. If 
null, only the latest

Review Comment:
   Add comment to explain what the `-1` means



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonDvTableUtils.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.paimon.utils;
+
+import org.apache.fluss.utils.types.Tuple2;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.SnapshotManager;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/** Utils for Paimon delete-vector enabled table. */
+public class PaimonDvTableUtils {
+
+    /**
+     * Find the latest snapshot that still holds the L0 files flushed by the 
given compacted
+     * snapshot.
+     *
+     * <p>The method works by:
+     *
+     * <ol>
+     *   <li>Getting the delta of the compacted snapshot to find deleted L0 
files
+     *   <li>Grouping deleted L0 files by bucket
+     *   <li>Searching backwards through previous snapshots to find the latest 
one whose L0 files
+     *       exactly match the deleted L0 files for each bucket
+     * </ol>
+     *
+     * <p>This snapshot is the most recent snapshot that still contains all 
the L0 files that were
+     * flushed (deleted) by the compacted snapshot.
+     *
+     * @param fileStoreTable the FileStoreTable instance
+     * @param compactedSnapshot the compacted snapshot whose flushed L0 files 
to search for
+     * @return the latest snapshot that still holds these L0 files, or null if 
not found
+     * @throws IOException if an error occurs
+     */
+    @Nullable
+    public static Snapshot findLatestSnapshotExactlyHoldingL0Files(
+            FileStoreTable fileStoreTable, Snapshot compactedSnapshot) throws 
IOException {
+        SnapshotManager snapshotManager = fileStoreTable.snapshotManager();
+        checkState(compactedSnapshot.commitKind() == 
Snapshot.CommitKind.COMPACT);
+        // Get deleted L0 files from the compacted snapshot's delta
+        Map<Tuple2<BinaryRow, Integer>, Set<String>> deletedL0FilesByBucket =
+                getDeletedL0FilesByBucket(fileStoreTable, compactedSnapshot);
+
+        if (deletedL0FilesByBucket.isEmpty()) {
+            // No L0 files were deleted, can't find a snapshot holding these 
L0 files,
+            // return null directly
+            return null;
+        }
+
+        // Search backwards from the compacted snapshot to find the latest 
snapshot
+        // that still holds these L0 files
+        long earliestSnapshot = 
checkNotNull(snapshotManager.earliestSnapshotId());
+        for (long snapshot = compactedSnapshot.id() - 1; snapshot >= 
earliestSnapshot; snapshot--) {
+            Snapshot candidateSnapshot = 
snapshotManager.tryGetSnapshot(snapshot);
+            if (candidateSnapshot == null) {
+                // no such snapshot in paimon, skip
+                continue;
+            }
+            if (matchesDeletedL0Files(fileStoreTable, candidateSnapshot, 
deletedL0FilesByBucket)) {
+                return candidateSnapshot;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Get deleted L0 files grouped by bucket from a compacted snapshot's 
delta.
+     *
+     * @param compactedSnapshot the compacted snapshot
+     * @return a map from bucket ID to set of deleted L0 file names
+     */
+    private static Map<Tuple2<BinaryRow, Integer>, Set<String>> 
getDeletedL0FilesByBucket(
+            FileStoreTable fileStoreTable, Snapshot compactedSnapshot) {
+        Map<Tuple2<BinaryRow, Integer>, Set<String>> deletedL0FilesByBucket = 
new HashMap<>();
+        List<ManifestEntry> manifestEntries =
+                fileStoreTable
+                        .store()
+                        .newScan()
+                        .withSnapshot(compactedSnapshot.id())
+                        .withKind(ScanMode.DELTA)
+                        .plan()
+                        .files(FileKind.DELETE);
+        for (ManifestEntry manifestEntry : manifestEntries) {
+            if (manifestEntry.level() == 0) {
+                deletedL0FilesByBucket
+                        .computeIfAbsent(
+                                Tuple2.of(manifestEntry.partition(), 
manifestEntry.bucket()),
+                                k -> new HashSet<>())
+                        .add(manifestEntry.fileName());
+            }
+        }
+        return deletedL0FilesByBucket;
+    }
+
+    /**
+     * Check if a candidate snapshot's L0 files exactly match the deleted L0 
files for the relevant
+     * buckets.
+     *
+     * @param candidateSnapshot the candidate snapshot to check
+     * @param deletedL0FilesByBucket the deleted L0 files grouped by bucket
+     * @return true if the candidate snapshot's L0 files match the deleted L0 
files per-bucket
+     */
+    private static boolean matchesDeletedL0Files(
+            FileStoreTable fileStoreTable,
+            Snapshot candidateSnapshot,
+            Map<Tuple2<BinaryRow, Integer>, Set<String>> 
deletedL0FilesByBucket) {
+        // Get L0 files from the candidate snapshot, grouped by bucket
+        Map<Tuple2<BinaryRow, Integer>, Set<String>> candidateL0FilesByBucket =
+                getL0FilesByBucket(fileStoreTable, candidateSnapshot);
+
+        for (Map.Entry<Tuple2<BinaryRow, Integer>, Set<String>> deleteL0Entry :
+                deletedL0FilesByBucket.entrySet()) {
+            Set<String> deleteL0Files = 
candidateL0FilesByBucket.get(deleteL0Entry.getKey());
+            if (deleteL0Files == null || 
!deleteL0Files.equals(deleteL0Entry.getValue())) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Get L0 files from a snapshot, grouped by bucket.
+     *
+     * <p>This method uses the scan API to get all L0 files that exist in the 
snapshot.
+     *
+     * @param snapshot the snapshot to get L0 files from
+     * @return a map from bucket ID to set of L0 file names that exist in the 
snapshot
+     */
+    private static Map<Tuple2<BinaryRow, Integer>, Set<String>> 
getL0FilesByBucket(

Review Comment:
   Use the `PaimonPartitionBucket` (the 
`DvTableReadableSnapshotRetriever.PartitionBucket`) to replace the Tuple2 for 
simplification. 



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java:
##########
@@ -121,21 +126,92 @@ String prepareLakeSnapshot(
         }
     }
 
-    void commit(
+    public void commit(

Review Comment:
   This is not used by production code, can be removed? Because it seems a 
duplicate code with 
`org.apache.fluss.flink.tiering.committer.TieringCommitOperator#commitToFluss`



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTable.java:
##########
@@ -97,7 +97,19 @@ private LakeTable(
     @Nullable
     public LakeSnapshotMetadata getLatestLakeSnapshotMetadata() {
         if (lakeSnapshotMetadatas != null && !lakeSnapshotMetadatas.isEmpty()) 
{
-            return lakeSnapshotMetadatas.get(0);
+            return lakeSnapshotMetadatas.get(lakeSnapshotMetadatas.size() - 1);

Review Comment:
   Currently, 
`org.apache.fluss.flink.tiering.committer.TieringCommitOperator#commitToFluss` 
may commit both the readable snapshot and the committed snapshot 
simultaneously, but this operation is **not atomic**. As a result, there’s a 
risk that users might observe the **readable snapshot as the latest snapshot**, 
even though it was committed long before the actual latest (committed) 
snapshot. This leads to incorrect visibility semantics.
   
   To address this, we should sort the `lakeSnapshotMetadatas` list by snapshot 
timestamp and ensure only the **most recent snapshot** (i.e., the last one 
after sorting) is treated as the effective committed snapshot for external 
visibility.
   
   Since this is a rare edge case, we can add a `TODO` with a reference to a 
tracking issue and defer the fix to a follow-up.



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.paimon.utils;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.metadata.LakeSnapshot;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.committer.LakeCommitResult;
+import org.apache.fluss.metadata.PartitionInfo;
+import org.apache.fluss.metadata.ResolvedPartitionSpec;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.utils.types.Tuple2;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.utils.SnapshotManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.fluss.lake.paimon.utils.PaimonDvTableUtils.findLatestSnapshotExactlyHoldingL0Files;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * A retriever to retrieve the readable snapshot and offsets for Paimon 
deletion vector enabled
+ * table.
+ */
+public class DvTableReadableSnapshotRetriever implements AutoCloseable {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DvTableReadableSnapshotRetriever.class);
+
+    private final TablePath tablePath;
+    private final long tableId;
+    private final FileStoreTable fileStoreTable;
+    private final Admin flussAdmin;
+    private final Connection flussConnection;
+    private final SnapshotManager snapshotManager;
+
+    public DvTableReadableSnapshotRetriever(
+            TablePath tablePath,
+            long tableId,
+            FileStoreTable paimonFileStoreTable,
+            Configuration flussConfig) {
+        this.tablePath = tablePath;
+        this.tableId = tableId;
+        this.fileStoreTable = paimonFileStoreTable;
+        this.flussConnection = ConnectionFactory.createConnection(flussConfig);
+        this.flussAdmin = flussConnection.getAdmin();
+        this.snapshotManager = fileStoreTable.snapshotManager();
+    }
+
+    /**
+     * Get readable offsets for DV tables based on the latest compacted 
snapshot.
+     *
+     * <p>For Paimon DV tables, when an appended snapshot is committed, we 
need to check the latest
+     * compacted snapshot to determine readable offsets for each bucket. This 
method implements
+     * incremental advancement of readable_snapshot per bucket:
+     *
+     * <ul>
+     *   <li>For buckets without L0 files: use offsets from the latest tiered 
snapshot. These
+     *       buckets can advance their readable offsets since all their data 
is in base files (L1+).
+     *   <li>For buckets with L0 files: traverse backwards through compacted 
snapshots to find the
+     *       latest one that flushed this bucket's L0 files. Then find the 
latest snapshot that
+     *       exactly holds those flushed L0 files, and use the previous APPEND 
snapshot's offset for
+     *       that bucket.
+     * </ul>
+     *
+     * <p>Algorithm:
+     *
+     * <ol>
+     *   <li>Find the latest compacted snapshot before the given tiered 
snapshot
+     *   <li>Check which buckets have no L0 files and which have L0 files in 
the compacted snapshot
+     *   <li>For buckets without L0 files: use offsets from the latest tiered 
snapshot (all data is
+     *       in base files, safe to advance)
+     *   <li>For buckets with L0 files:
+     *       <ol>
+     *         <li>Traverse backwards through compacted snapshots starting 
from the latest one
+     *         <li>For each compacted snapshot, check which buckets had their 
L0 files flushed
+     *         <li>For each flushed bucket, find the latest snapshot that 
exactly holds those L0
+     *             files using {@link 
PaimonDvTableUtils#findLatestSnapshotExactlyHoldingL0Files}
+     *         <li>Find the previous APPEND snapshot before that snapshot
+     *         <li>Use that APPEND snapshot's offset for the bucket
+     *       </ol>
+     *   <li>Return readable offsets for all buckets, allowing incremental 
advancement
+     * </ol>
+     *
+     * <p>Note: This allows readable_snapshot to advance incrementally per 
bucket. Each bucket's
+     * readable offset is set to the maximum offset that is actually readable 
in the compacted
+     * snapshot, ensuring no data duplication or loss. The readable_snapshot 
is set to the latest
+     * compacted snapshot ID, and each bucket continues reading from its 
respective readable offset.
+     *
+     * <p>Example: If bucket0's L0 files were flushed in snapshot5 (which 
compacted snapshot1's L0
+     * files), and snapshot4 is the latest snapshot that exactly holds those 
L0 files, then
+     * bucket0's readable offset will be set to snapshot4's previous APPEND 
snapshot's offset.
+     *
+     * @param tieredSnapshotId the tiered snapshot ID (the appended snapshot 
that was just
+     *     committed)
+     * @return a tuple containing the readable snapshot ID (the latest 
compacted snapshot) and a map
+     *     of TableBucket to readable offset for all buckets, or null if:
+     *     <ul>
+     *       <li>No compacted snapshot exists before the tiered snapshot
+     *       <li>Cannot find the latest snapshot holding flushed L0 files for 
some buckets
+     *       <li>Cannot find the previous APPEND snapshot for some buckets
+     *       <li>Cannot find offsets in Fluss for some buckets
+     *     </ul>
+     *     The map contains offsets for ALL buckets, allowing incremental 
advancement.
+     * @throws IOException if an error occurs reading snapshots or offsets 
from Fluss
+     */
+    @Nullable
+    public ReadableSnapshotResult getReadableSnapshotAndOffsets(long 
tieredSnapshotId)
+            throws IOException {
+        // Find the latest compacted snapshot
+        Snapshot latestCompactedSnapshot =
+                findPreviousSnapshot(tieredSnapshotId, 
Snapshot.CommitKind.COMPACT);
+        if (latestCompactedSnapshot == null) {
+            // No compacted snapshot found, may happen when no compaction 
happens or snapshot
+            // expiration, we can't update readable offsets, return null 
directly
+            LOG.info(
+                    "Can't find latest compacted snapshot before snapshot {}, 
skip get readable snapshot.",
+                    tieredSnapshotId);
+            return null;
+        }
+
+        Map<TableBucket, Long> readableOffsets = new HashMap<>();
+
+        FlussTableBucketMapper flussTableBucketMapper = new 
FlussTableBucketMapper();
+
+        // get all the bucket without l0 files and with l0 files
+        Tuple2<Set<PartitionBucket>, Set<PartitionBucket>> 
bucketsWithoutL0AndWithL0 =
+                getBucketsWithAndWithoutL0AndWithL0(latestCompactedSnapshot);
+        Set<PartitionBucket> bucketsWithoutL0 = bucketsWithoutL0AndWithL0.f0;
+        Set<PartitionBucket> bucketsWithL0 = bucketsWithoutL0AndWithL0.f1;
+
+        // Track the earliest previousAppendSnapshot ID that was accessed
+        // This represents the oldest snapshot that might still be needed
+        long earliestSnapshotIdToKeep = LakeCommitResult.KEEP_ALL_PREVIOUS;
+
+        if (!bucketsWithoutL0.isEmpty()) {
+            // Get latest tiered offsets
+            LakeSnapshot latestTieredSnapshot;
+            try {
+                latestTieredSnapshot = 
flussAdmin.getLatestLakeSnapshot(tablePath).get();
+            } catch (Exception e) {
+                throw new IOException(
+                        "Failed to read lake snapshot from Fluss server for 
snapshot "
+                                + latestCompactedSnapshot.id(),
+                        e);
+            }
+            // for all buckets without l0, we can use the latest tiered offsets
+            for (PartitionBucket bucket : bucketsWithoutL0) {
+                TableBucket tableBucket = 
flussTableBucketMapper.toTableBucket(bucket);
+                if (tableBucket == null) {
+                    // can't map such paimon bucket to fluss, just ignore
+                    continue;
+                }
+                readableOffsets.put(
+                        tableBucket, 
latestTieredSnapshot.getTableBucketsOffset().get(tableBucket));
+            }
+        }
+
+        // for all buckets with l0, we need to find the latest compacted 
snapshot which flushed
+        // the buckets, the per-bucket offset should be updated to the 
corresponding compacted
+        // snapshot offsets
+        Set<PartitionBucket> allBucketsToAdvance = new 
HashSet<>(bucketsWithL0);
+
+        long earliestSnapshotId = 
checkNotNull(snapshotManager.earliestSnapshotId());
+        // From latestCompacted forward traverse compacted snapshots
+        for (long currentSnapshotId = latestCompactedSnapshot.id();
+                currentSnapshotId >= earliestSnapshotId;
+                currentSnapshotId--) {
+            // no any buckets to advance, break directly
+            if (allBucketsToAdvance.isEmpty()) {
+                break;
+            }
+            Snapshot currentSnapshot = 
snapshotManager.tryGetSnapshot(currentSnapshotId);
+            if (currentSnapshot == null
+                    || currentSnapshot.commitKind() != 
Snapshot.CommitKind.COMPACT) {
+                continue;
+            }
+            // Get buckets flushed by current compacted snapshot
+            Set<PartitionBucket> flushedBuckets = 
getBucketsWithFlushedL0(currentSnapshot);
+            // For each flushed bucket, if offset not set yet, set it
+            for (PartitionBucket partitionBucket : flushedBuckets) {
+                TableBucket tb = 
flussTableBucketMapper.toTableBucket(partitionBucket);
+                if (tb == null) {
+                    // can't map such paimon bucket to fluss,just ignore
+                    // don't need to advance offset for the bucket
+                    allBucketsToAdvance.remove(partitionBucket);
+                    continue;
+                }
+                if (!readableOffsets.containsKey(tb)) {
+                    Snapshot sourceSnapshot =
+                            findLatestSnapshotExactlyHoldingL0Files(
+                                    fileStoreTable, currentSnapshot);
+                    // it happens if there is a compacted snapshot flush l0 
files for a bucket,
+                    // but the snapshot from which the compacted snapshot 
compact is expired
+                    // it should happen rarely, we can't determine the 
readable offsets for this
+                    // bucket, currently, we just return null to stop readable 
offset advance
+                    // if it happen, compaction should work unexpected, warn 
it and reminds to
+                    // increase snapshot retention
+                    if (sourceSnapshot == null) {
+                        LOG.warn(
+                                "Cannot find snapshot holding L0 files flushed 
by compacted snapshot {} for bucket {}, "
+                                        + "the snapshot may have been expired. 
Consider increasing snapshot retention.",
+                                currentSnapshot.id(),
+                                tb);
+                        return null;
+                    }
+
+                    // we already find that for this bucket, which snapshot do 
the latest flush,
+                    // the offset for the previous one append snapshot should 
be the readable
+                    // offset
+                    Snapshot previousAppendSnapshot =
+                            sourceSnapshot.commitKind() == 
Snapshot.CommitKind.APPEND
+                                    ? sourceSnapshot
+                                    : findPreviousSnapshot(
+                                            sourceSnapshot.id(), 
Snapshot.CommitKind.APPEND);
+
+                    // Can't find previous APPEND snapshot, likely due to 
snapshot expiration.
+                    // This happens when the snapshot holding flushed L0 files 
is a COMPACT
+                    // snapshot,
+                    // and all APPEND snapshots before it have been expired.
+                    //
+                    // TODO: Optimization - Store compacted snapshot offsets 
in Fluss
+                    // Currently, we rely on Paimon to find the previous 
APPEND snapshot to get its
+                    // offset. If Fluss stores offsets for all snapshots 
(including COMPACT
+                    // snapshots),
+                    // we could:
+                    // 1. Use the sourceSnapshot's offset directly if it's 
stored in Fluss
+                    // 2. Find any previous snapshot (COMPACT or APPEND) and 
use its offset
+                    // 3. This would make the system more resilient to 
snapshot expiration
+                    if (previousAppendSnapshot == null) {
+                        LOG.warn(
+                                "Cannot find previous APPEND snapshot before 
snapshot {} for bucket {}. "
+                                        + "This may be due to snapshot 
expiration. Consider increasing paimon snapshot retention.",
+                                sourceSnapshot.id(),
+                                tb);
+                        return null;
+                    }
+
+                    // Track the minimum previousAppendSnapshot ID
+                    // This snapshot will be accessed via getLakeSnapshot, so 
we need to keep it
+                    if (earliestSnapshotIdToKeep <= 0
+                            || previousAppendSnapshot.id() < 
earliestSnapshotIdToKeep) {
+                        earliestSnapshotIdToKeep = previousAppendSnapshot.id();
+                    }
+
+                    try {
+                        LakeSnapshot lakeSnapshot =
+                                flussAdmin
+                                        .getLakeSnapshot(tablePath, 
previousAppendSnapshot.id())
+                                        .get();

Review Comment:
   This is invoked for every buckets, if we have a bunch of buckets (like 10K), 
this will cost a lot of time. I think we can use a simple HashMap to cache the 
snapshot information, because most of the buckets should request the same 
snpashot id. 



##########
fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java:
##########
@@ -429,21 +413,67 @@ CompletableFuture<KvSnapshotMetadata> 
getKvSnapshotMetadata(
             TableBucket bucket, long snapshotId);
 
     /**
-     * Get table lake snapshot info of the given table asynchronously.
+     * Retrieves the absolute latest lake snapshot metadata for a table 
asynchronously.
      *
-     * <p>It'll get the latest snapshot for all the buckets of the table.
+     * <p>This returns the most recent snapshot regardless of its visibility 
or compaction status.
+     * It includes the latest tiered offsets for all buckets.
      *
-     * <p>The following exceptions can be anticipated when calling {@code 
get()} on returned future.
+     * <p>Exceptions expected when calling {@code get()} on the returned 
future:
      *
      * <ul>
-     *   <li>{@link TableNotExistException} if the table does not exist.
-     *   <li>{@link LakeTableSnapshotNotExistException} if no any lake 
snapshot exist.
+     *   <li>{@link TableNotExistException}: If the table does not exist.
+     *   <li>{@link LakeTableSnapshotNotExistException}: If no any snapshots.
      * </ul>
      *
-     * @param tablePath the table path of the table.
+     * @param tablePath The path of the target table.
+     * @return A future returning the latest tiered snapshot.
      */
     CompletableFuture<LakeSnapshot> getLatestLakeSnapshot(TablePath tablePath);

Review Comment:
   Please add a prominent **NOTE** in the Javadoc clearly warning that this API 
is **not intended for union reads**. Instead, users should use 
`getReadableLakeSnapshot()`. This method should be considered an internal API 
now.



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.paimon.utils;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.metadata.LakeSnapshot;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.committer.LakeCommitResult;
+import org.apache.fluss.metadata.PartitionInfo;
+import org.apache.fluss.metadata.ResolvedPartitionSpec;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.utils.types.Tuple2;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.utils.SnapshotManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.fluss.lake.paimon.utils.PaimonDvTableUtils.findLatestSnapshotExactlyHoldingL0Files;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * A retriever to retrieve the readable snapshot and offsets for Paimon 
deletion vector enabled
+ * table.
+ */
+public class DvTableReadableSnapshotRetriever implements AutoCloseable {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DvTableReadableSnapshotRetriever.class);
+
+    private final TablePath tablePath;
+    private final long tableId;
+    private final FileStoreTable fileStoreTable;
+    private final Admin flussAdmin;
+    private final Connection flussConnection;
+    private final SnapshotManager snapshotManager;
+
+    public DvTableReadableSnapshotRetriever(
+            TablePath tablePath,
+            long tableId,
+            FileStoreTable paimonFileStoreTable,
+            Configuration flussConfig) {
+        this.tablePath = tablePath;
+        this.tableId = tableId;
+        this.fileStoreTable = paimonFileStoreTable;
+        this.flussConnection = ConnectionFactory.createConnection(flussConfig);
+        this.flussAdmin = flussConnection.getAdmin();
+        this.snapshotManager = fileStoreTable.snapshotManager();
+    }
+
+    /**
+     * Get readable offsets for DV tables based on the latest compacted 
snapshot.
+     *
+     * <p>For Paimon DV tables, when an appended snapshot is committed, we 
need to check the latest
+     * compacted snapshot to determine readable offsets for each bucket. This 
method implements
+     * incremental advancement of readable_snapshot per bucket:
+     *
+     * <ul>
+     *   <li>For buckets without L0 files: use offsets from the latest tiered 
snapshot. These
+     *       buckets can advance their readable offsets since all their data 
is in base files (L1+).
+     *   <li>For buckets with L0 files: traverse backwards through compacted 
snapshots to find the
+     *       latest one that flushed this bucket's L0 files. Then find the 
latest snapshot that
+     *       exactly holds those flushed L0 files, and use the previous APPEND 
snapshot's offset for
+     *       that bucket.
+     * </ul>
+     *
+     * <p>Algorithm:
+     *
+     * <ol>
+     *   <li>Find the latest compacted snapshot before the given tiered 
snapshot
+     *   <li>Check which buckets have no L0 files and which have L0 files in 
the compacted snapshot
+     *   <li>For buckets without L0 files: use offsets from the latest tiered 
snapshot (all data is
+     *       in base files, safe to advance)
+     *   <li>For buckets with L0 files:
+     *       <ol>
+     *         <li>Traverse backwards through compacted snapshots starting 
from the latest one
+     *         <li>For each compacted snapshot, check which buckets had their 
L0 files flushed
+     *         <li>For each flushed bucket, find the latest snapshot that 
exactly holds those L0
+     *             files using {@link 
PaimonDvTableUtils#findLatestSnapshotExactlyHoldingL0Files}
+     *         <li>Find the previous APPEND snapshot before that snapshot
+     *         <li>Use that APPEND snapshot's offset for the bucket
+     *       </ol>
+     *   <li>Return readable offsets for all buckets, allowing incremental 
advancement
+     * </ol>
+     *
+     * <p>Note: This allows readable_snapshot to advance incrementally per 
bucket. Each bucket's
+     * readable offset is set to the maximum offset that is actually readable 
in the compacted
+     * snapshot, ensuring no data duplication or loss. The readable_snapshot 
is set to the latest
+     * compacted snapshot ID, and each bucket continues reading from its 
respective readable offset.
+     *
+     * <p>Example: If bucket0's L0 files were flushed in snapshot5 (which 
compacted snapshot1's L0
+     * files), and snapshot4 is the latest snapshot that exactly holds those 
L0 files, then
+     * bucket0's readable offset will be set to snapshot4's previous APPEND 
snapshot's offset.
+     *
+     * @param tieredSnapshotId the tiered snapshot ID (the appended snapshot 
that was just
+     *     committed)
+     * @return a tuple containing the readable snapshot ID (the latest 
compacted snapshot) and a map
+     *     of TableBucket to readable offset for all buckets, or null if:
+     *     <ul>
+     *       <li>No compacted snapshot exists before the tiered snapshot
+     *       <li>Cannot find the latest snapshot holding flushed L0 files for 
some buckets
+     *       <li>Cannot find the previous APPEND snapshot for some buckets
+     *       <li>Cannot find offsets in Fluss for some buckets
+     *     </ul>
+     *     The map contains offsets for ALL buckets, allowing incremental 
advancement.
+     * @throws IOException if an error occurs reading snapshots or offsets 
from Fluss
+     */
+    @Nullable
+    public ReadableSnapshotResult getReadableSnapshotAndOffsets(long 
tieredSnapshotId)
+            throws IOException {
+        // Find the latest compacted snapshot
+        Snapshot latestCompactedSnapshot =
+                findPreviousSnapshot(tieredSnapshotId, 
Snapshot.CommitKind.COMPACT);
+        if (latestCompactedSnapshot == null) {
+            // No compacted snapshot found, may happen when no compaction 
happens or snapshot
+            // expiration, we can't update readable offsets, return null 
directly
+            LOG.info(
+                    "Can't find latest compacted snapshot before snapshot {}, 
skip get readable snapshot.",
+                    tieredSnapshotId);
+            return null;
+        }
+
+        Map<TableBucket, Long> readableOffsets = new HashMap<>();
+
+        FlussTableBucketMapper flussTableBucketMapper = new 
FlussTableBucketMapper();
+
+        // get all the bucket without l0 files and with l0 files
+        Tuple2<Set<PartitionBucket>, Set<PartitionBucket>> 
bucketsWithoutL0AndWithL0 =
+                getBucketsWithAndWithoutL0AndWithL0(latestCompactedSnapshot);
+        Set<PartitionBucket> bucketsWithoutL0 = bucketsWithoutL0AndWithL0.f0;
+        Set<PartitionBucket> bucketsWithL0 = bucketsWithoutL0AndWithL0.f1;
+
+        // Track the earliest previousAppendSnapshot ID that was accessed
+        // This represents the oldest snapshot that might still be needed
+        long earliestSnapshotIdToKeep = LakeCommitResult.KEEP_ALL_PREVIOUS;
+
+        if (!bucketsWithoutL0.isEmpty()) {
+            // Get latest tiered offsets
+            LakeSnapshot latestTieredSnapshot;
+            try {
+                latestTieredSnapshot = 
flussAdmin.getLatestLakeSnapshot(tablePath).get();
+            } catch (Exception e) {
+                throw new IOException(
+                        "Failed to read lake snapshot from Fluss server for 
snapshot "
+                                + latestCompactedSnapshot.id(),
+                        e);
+            }
+            // for all buckets without l0, we can use the latest tiered offsets
+            for (PartitionBucket bucket : bucketsWithoutL0) {
+                TableBucket tableBucket = 
flussTableBucketMapper.toTableBucket(bucket);
+                if (tableBucket == null) {
+                    // can't map such paimon bucket to fluss, just ignore
+                    continue;
+                }
+                readableOffsets.put(
+                        tableBucket, 
latestTieredSnapshot.getTableBucketsOffset().get(tableBucket));
+            }
+        }
+
+        // for all buckets with l0, we need to find the latest compacted 
snapshot which flushed
+        // the buckets, the per-bucket offset should be updated to the 
corresponding compacted
+        // snapshot offsets
+        Set<PartitionBucket> allBucketsToAdvance = new 
HashSet<>(bucketsWithL0);
+
+        long earliestSnapshotId = 
checkNotNull(snapshotManager.earliestSnapshotId());
+        // From latestCompacted forward traverse compacted snapshots
+        for (long currentSnapshotId = latestCompactedSnapshot.id();
+                currentSnapshotId >= earliestSnapshotId;
+                currentSnapshotId--) {
+            // no any buckets to advance, break directly
+            if (allBucketsToAdvance.isEmpty()) {
+                break;
+            }
+            Snapshot currentSnapshot = 
snapshotManager.tryGetSnapshot(currentSnapshotId);
+            if (currentSnapshot == null
+                    || currentSnapshot.commitKind() != 
Snapshot.CommitKind.COMPACT) {
+                continue;
+            }
+            // Get buckets flushed by current compacted snapshot
+            Set<PartitionBucket> flushedBuckets = 
getBucketsWithFlushedL0(currentSnapshot);
+            // For each flushed bucket, if offset not set yet, set it
+            for (PartitionBucket partitionBucket : flushedBuckets) {
+                TableBucket tb = 
flussTableBucketMapper.toTableBucket(partitionBucket);
+                if (tb == null) {
+                    // can't map such paimon bucket to fluss,just ignore
+                    // don't need to advance offset for the bucket
+                    allBucketsToAdvance.remove(partitionBucket);
+                    continue;
+                }
+                if (!readableOffsets.containsKey(tb)) {
+                    Snapshot sourceSnapshot =
+                            findLatestSnapshotExactlyHoldingL0Files(
+                                    fileStoreTable, currentSnapshot);
+                    // it happens if there is a compacted snapshot flush l0 
files for a bucket,
+                    // but the snapshot from which the compacted snapshot 
compact is expired
+                    // it should happen rarely, we can't determine the 
readable offsets for this
+                    // bucket, currently, we just return null to stop readable 
offset advance
+                    // if it happen, compaction should work unexpected, warn 
it and reminds to
+                    // increase snapshot retention
+                    if (sourceSnapshot == null) {
+                        LOG.warn(
+                                "Cannot find snapshot holding L0 files flushed 
by compacted snapshot {} for bucket {}, "
+                                        + "the snapshot may have been expired. 
Consider increasing snapshot retention.",
+                                currentSnapshot.id(),
+                                tb);
+                        return null;
+                    }
+
+                    // we already find that for this bucket, which snapshot do 
the latest flush,
+                    // the offset for the previous one append snapshot should 
be the readable
+                    // offset
+                    Snapshot previousAppendSnapshot =
+                            sourceSnapshot.commitKind() == 
Snapshot.CommitKind.APPEND
+                                    ? sourceSnapshot
+                                    : findPreviousSnapshot(
+                                            sourceSnapshot.id(), 
Snapshot.CommitKind.APPEND);
+
+                    // Can't find previous APPEND snapshot, likely due to 
snapshot expiration.
+                    // This happens when the snapshot holding flushed L0 files 
is a COMPACT
+                    // snapshot,
+                    // and all APPEND snapshots before it have been expired.
+                    //
+                    // TODO: Optimization - Store compacted snapshot offsets 
in Fluss
+                    // Currently, we rely on Paimon to find the previous 
APPEND snapshot to get its
+                    // offset. If Fluss stores offsets for all snapshots 
(including COMPACT
+                    // snapshots),
+                    // we could:
+                    // 1. Use the sourceSnapshot's offset directly if it's 
stored in Fluss
+                    // 2. Find any previous snapshot (COMPACT or APPEND) and 
use its offset
+                    // 3. This would make the system more resilient to 
snapshot expiration
+                    if (previousAppendSnapshot == null) {
+                        LOG.warn(
+                                "Cannot find previous APPEND snapshot before 
snapshot {} for bucket {}. "
+                                        + "This may be due to snapshot 
expiration. Consider increasing paimon snapshot retention.",
+                                sourceSnapshot.id(),
+                                tb);
+                        return null;
+                    }
+
+                    // Track the minimum previousAppendSnapshot ID
+                    // This snapshot will be accessed via getLakeSnapshot, so 
we need to keep it
+                    if (earliestSnapshotIdToKeep <= 0
+                            || previousAppendSnapshot.id() < 
earliestSnapshotIdToKeep) {
+                        earliestSnapshotIdToKeep = previousAppendSnapshot.id();
+                    }
+
+                    try {
+                        LakeSnapshot lakeSnapshot =
+                                flussAdmin
+                                        .getLakeSnapshot(tablePath, 
previousAppendSnapshot.id())
+                                        .get();
+                        Long offset = 
lakeSnapshot.getTableBucketsOffset().get(tb);
+                        if (offset != null) {
+                            readableOffsets.put(tb, offset);
+                            allBucketsToAdvance.remove(partitionBucket);
+                        } else {
+                            LOG.error(
+                                    "Could not find offset for bucket {} in 
snapshot {}, skip advancing readable snapshot.",
+                                    tb,
+                                    previousAppendSnapshot.id());
+                            return null;
+                        }
+                    } catch (Exception e) {
+                        LOG.error(
+                                "Failed to read lake snapshot {} from Fluss 
server, skip update readable snapshot and offset.",
+                                previousAppendSnapshot.id(),
+                                e);
+                        return null;
+                    }
+                }
+            }
+        }
+
+        // This happens when there are writes to a bucket, but no compaction 
has happened for that
+        // bucket from the earliest snapshot to the latest compacted snapshot.
+        // This should happen rarely in practice, as compaction typically 
processes all buckets over
+        // time.
+        //
+        // TODO: Optimization - Handle buckets without flushed L0 files
+        // We can optimize this case in two ways:
+        // 1. If a previous readable snapshot exists between earliest and 
latest snapshot:
+        //    - Reuse the readable snapshot's offset for this bucket (safe 
since no L0 was flushed)
+        // 2. If the earliest snapshot is the first snapshot committed by 
Fluss:
+        //    - Set the readable offset to 0 for this bucket (no data was 
readable before)
+        // These optimizations would allow readable_snapshot to advance even 
when some buckets
+        // haven't been compacted yet, improving overall system progress.
+        if (!allBucketsToAdvance.isEmpty()) {
+            LOG.warn(
+                    "Could not find flushed snapshots for buckets with L0: {}. 
"
+                            + "These buckets have L0 files but no found 
compaction snapshot has flushed them yet."
+                            + " Consider increasing paimon snapshot 
retention.",
+                    allBucketsToAdvance);
+            return null;
+        }
+
+        // to get the the tiered offset for the readable snapshot,
+        Snapshot previousSnapshot =
+                findPreviousSnapshot(latestCompactedSnapshot.id(), 
Snapshot.CommitKind.APPEND);
+        if (previousSnapshot == null) {
+            LOG.warn(
+                    "Failed to find a previous APPEND snapshot before 
compacted snapshot {} for table {}. "
+                            + "This prevents retrieving baseline offsets from 
Fluss.",
+                    latestCompactedSnapshot.id(),
+                    tablePath);
+            return null;
+        }
+
+        Map<TableBucket, Long> tieredOffsets;
+        try {
+            tieredOffsets =
+                    flussAdmin
+                            .getLakeSnapshot(tablePath, previousSnapshot.id())
+                            .get()
+                            .getTableBucketsOffset();
+        } catch (Exception e) {
+            // we can't get tieredOffsets, just skip it
+            LOG.error(
+                    "Failed to retrieve lake snapshot {} from Fluss server for 
table {}. "
+                            + "The readable snapshot advancement will be 
skipped.",
+                    previousSnapshot.id(),
+                    tablePath,
+                    e);
+            return null;
+        }
+
+        // Return the latest compacted snapshot ID as the unified readable 
snapshot
+        // All buckets can read from this snapshot's base files, then continue 
from their
+        // respective readable offsets
+        // Also return the minimum previousAppendSnapshot ID that was accessed
+        // Snapshots before this ID can potentially be safely deleted from 
Fluss
+        return new ReadableSnapshotResult(
+                latestCompactedSnapshot.id(),
+                tieredOffsets,
+                readableOffsets,
+                earliestSnapshotIdToKeep);
+    }
+
+    /**
+     * Get buckets (with partition info) that have no L0 files in the given 
snapshot.
+     *
+     * <p>For Paimon DV tables, we check the snapshot's data files to 
determine which buckets have
+     * no L0 delta files. A bucket has no L0 files if all its data is in base 
files.
+     *
+     * <p>For partitioned tables, we include partition information in the 
returned TableBucket
+     * objects.
+     *
+     * @param snapshot the snapshot to check
+     * @return set of TableBucket that have no L0 files
+     */
+    private Tuple2<Set<PartitionBucket>, Set<PartitionBucket>> 
getBucketsWithAndWithoutL0AndWithL0(
+            Snapshot snapshot) {
+        Set<PartitionBucket> bucketsWithoutL0 = new HashSet<>();
+        Set<PartitionBucket> bucketsWithL0 = new HashSet<>();
+
+        // Scan the snapshot to get all splits including level0
+        Map<String, String> scanOptions = new HashMap<>();
+        scanOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), 
String.valueOf(snapshot.id()));
+        // hacky: set batch scan mode to compact to make sure we can get l0 
level files
+        scanOptions.put(
+                CoreOptions.BATCH_SCAN_MODE.key(), 
CoreOptions.BatchScanMode.COMPACT.getValue());
+
+        Map<BinaryRow, Map<Integer, List<ManifestEntry>>> manifestsByBucket =
+                FileStoreScan.Plan.groupByPartFiles(
+                        
fileStoreTable.copy(scanOptions).store().newScan().plan().files());
+
+        for (Map.Entry<BinaryRow, Map<Integer, List<ManifestEntry>>> 
manifestsByBucketEntry :
+                manifestsByBucket.entrySet()) {
+            BinaryRow partition = manifestsByBucketEntry.getKey();
+            Map<Integer, List<ManifestEntry>> buckets = 
manifestsByBucketEntry.getValue();
+            for (Map.Entry<Integer, List<ManifestEntry>> bucketEntry : 
buckets.entrySet()) {
+                // no l0 file
+                if (bucketEntry.getValue().stream()
+                        .allMatch(
+                                manifestEntry ->
+                                        manifestEntry.kind() != FileKind.DELETE
+                                                && 
manifestEntry.file().level() > 0)) {
+                    bucketsWithoutL0.add(new PartitionBucket(partition, 
bucketEntry.getKey()));
+                } else {
+                    bucketsWithL0.add(new PartitionBucket(partition, 
bucketEntry.getKey()));
+                }
+            }
+        }
+        return Tuple2.of(bucketsWithoutL0, bucketsWithL0);
+    }
+
+    /**
+     * Get buckets (with partition info) whose L0 files were flushed (deleted) 
in a compacted
+     * snapshot's delta.
+     *
+     * @param compactedSnapshot the compacted snapshot to check
+     * @return set of PartitionBucket whose L0 files were flushed
+     */
+    private Set<PartitionBucket> getBucketsWithFlushedL0(Snapshot 
compactedSnapshot) {
+        checkState(compactedSnapshot.commitKind() == 
Snapshot.CommitKind.COMPACT);
+        Set<PartitionBucket> flushedBuckets = new HashSet<>();
+
+        // Scan the compacted snapshot's delta to find deleted L0 files
+        List<ManifestEntry> manifestEntries =
+                fileStoreTable
+                        .store()
+                        .newScan()
+                        .withSnapshot(compactedSnapshot.id())
+                        .withKind(ScanMode.DELTA)
+                        .plan()
+                        .files(FileKind.DELETE);
+
+        for (ManifestEntry manifestEntry : manifestEntries) {
+            if (manifestEntry.level() == 0) {
+                flushedBuckets.add(
+                        new PartitionBucket(manifestEntry.partition(), 
manifestEntry.bucket()));
+            }
+        }
+
+        return flushedBuckets;
+    }
+
+    @Nullable
+    private Snapshot findPreviousSnapshot(long beforeSnapshotId, 
Snapshot.CommitKind commitKind)
+            throws IOException {
+        SnapshotManager snapshotManager = fileStoreTable.snapshotManager();
+        long earliestSnapshotId = 
checkNotNull(snapshotManager.earliestSnapshotId());
+        for (long currentSnapshotId = beforeSnapshotId - 1;
+                currentSnapshotId >= earliestSnapshotId;
+                currentSnapshotId--) {
+            Snapshot snapshot = 
snapshotManager.tryGetSnapshot(currentSnapshotId);
+            if (snapshot != null && snapshot.commitKind() == commitKind) {
+                return snapshot;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Get partition name to partition id mapping for the table.
+     *
+     * @return map from partition name to partition id
+     */
+    private Map<String, Long> getPartitionNameToIdMapping() throws IOException 
{
+        try {
+            List<PartitionInfo> partitionInfos = 
flussAdmin.listPartitionInfos(tablePath).get();
+            return partitionInfos.stream()
+                    .collect(
+                            Collectors.toMap(
+                                    PartitionInfo::getPartitionName,
+                                    PartitionInfo::getPartitionId));
+        } catch (Exception e) {
+            throw new IOException("Fail to list partitions", e);
+        }
+    }
+
+    /**
+     * Convert Paimon BinaryRow partition to Fluss partition name, whose 
format is:
+     * value1$value2$...$valueN.
+     *
+     * @param partition the BinaryRow partition from Paimon
+     * @return partition name string
+     */
+    private String getPartitionNameFromBinaryRow(BinaryRow partition) {
+        List<String> partitionValues = new ArrayList<>();
+        for (int i = 0; i < partition.getFieldCount(); i++) {
+            // todo: consider other partition type
+            BinaryString binaryString = partition.getString(i);
+            partitionValues.add(binaryString.toString());
+        }
+        return String.join(ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR, 
partitionValues);
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (flussAdmin != null) {
+            flussAdmin.close();
+        }
+        if (flussConnection != null) {
+            flussConnection.close();
+        }
+    }
+
+    /**
+     * Result of {@link #getReadableSnapshotAndOffsets}, containing readable 
snapshot information
+     * and the minimum snapshot ID that can be safely deleted.
+     */
+    public static class ReadableSnapshotResult {
+        private final long readableSnapshotId;
+        private final Map<TableBucket, Long> tieredOffsets;
+        private final Map<TableBucket, Long> readableOffsets;
+        private final long earliestSnapshotIdToKeep;
+
+        public ReadableSnapshotResult(
+                long readableSnapshotId,
+                Map<TableBucket, Long> tieredOffsets,
+                Map<TableBucket, Long> readableOffsets,
+                long earliestSnapshotIdToKeep) {
+            this.readableSnapshotId = readableSnapshotId;
+            this.tieredOffsets = tieredOffsets;
+            this.readableOffsets = readableOffsets;
+            this.earliestSnapshotIdToKeep = earliestSnapshotIdToKeep;
+        }
+
+        public Map<TableBucket, Long> getTieredOffsets() {
+            return tieredOffsets;
+        }
+
+        public long getReadableSnapshotId() {
+            return readableSnapshotId;
+        }
+
+        public Map<TableBucket, Long> getReadableOffsets() {
+            return readableOffsets;
+        }
+
+        /**
+         * Returns the earliest snapshot ID that should keep in Fluss.
+         *
+         * <p>This is the earliest ID among all snapshot that were accessed 
via {@code
+         * getLakeSnapshot} during the retrieve readable offset. Snapshots 
before this ID can
+         * potentially be safely deleted.
+         */
+        public long getEarliestSnapshotIdToKeep() {
+            return earliestSnapshotIdToKeep;
+        }
+    }
+
+    private static final class PartitionBucket {

Review Comment:
   rename to `PaimonPartitionBucket` and make it public in the module to share 
with other clases. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to