platinumhamburg commented on code in PR #2179:
URL: https://github.com/apache/fluss/pull/2179#discussion_r2684178818


##########
fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java:
##########
@@ -412,6 +415,56 @@ CompletableFuture<Void> dropPartition(
     CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
             TableBucket bucket, long snapshotId);
 
+    /**
+     * Acquires a lease for specific KV snapshots of the given tableBuckets 
asynchronously.
+     *
+     * <p>Once acquired, the specified KV snapshots will be protected from 
garbage collection for
+     * the duration of the {@code leaseDuration}. The client must call {@link
+     * #releaseKvSnapshotLease} to release the lock early when reading is 
finished.
+     *
+     * <p>If the lease expires (no renew received within duration), the server 
is free to delete the
+     * snapshot files.
+     *
+     * <p>The following exceptions can be anticipated when calling {@code 
get()} on returned future:
+     *
+     * <ul>
+     *   <li>{@link TableNotExistException} if the table does not exist.
+     *   <li>{@link PartitionNotExistException} if the partition does not 
exist.
+     * </ul>
+     *
+     * @param leaseId The unique ID for this lease session (usually a UUID 
generated by client).
+     * @param snapshotIds The snapshots to lease, a map from TableBucket to 
kvSnapshotId.
+     * @param leaseDuration The duration (in milliseconds) for which the 
snapshots should be kept.
+     * @return The result of the acquire operation, containing any buckets 
that failed to be locked.
+     */
+    CompletableFuture<AcquireKvSnapshotLeaseResult> acquireKvSnapshotLease(
+            String leaseId, Map<TableBucket, Long> snapshotIds, long 
leaseDuration);
+
+    /**
+     * Releases the lease for specific tableBuckets asynchronously.
+     *
+     * <p>This is typically called when a client finishes reading a specific 
bucket (or a batch of
+     * buckets) but is still reading others under the same leaseId.
+     *
+     * <p>If {@code bucketsToRelease} contains all buckets under this leaseId, 
the lease itself will
+     * be removed.
+     *
+     * @param leaseId The lease id.
+     * @param bucketsToRelease The specific tableBuckets to release.
+     */
+    CompletableFuture<Void> releaseKvSnapshotLease(
+            String leaseId, Set<TableBucket> bucketsToRelease);
+
+    /**
+     * Releases the entire lease asynchronously.
+     *
+     * <p>All snapshots locked under this {@code leaseId} will be released 
immediately. This is
+     * equivalent to calling {@link #releaseKvSnapshotLease} with all held 
buckets.
+     *
+     * @param leaseId The lease id to release.
+     */
+    CompletableFuture<Void> dropKvSnapshotLease(String leaseId);

Review Comment:
   rename to releaseAllSnapshotLeases()



##########
fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java:
##########
@@ -387,6 +392,33 @@ public CompletableFuture<KvSnapshotMetadata> 
getKvSnapshotMetadata(
                 .thenApply(ClientRpcMessageUtils::toKvSnapshotMetadata);
     }
 
+    @Override
+    public CompletableFuture<AcquireKvSnapshotLeaseResult> 
acquireKvSnapshotLease(
+            String leaseId, Map<TableBucket, Long> snapshotIds, long 
leaseDuration) {
+        if (snapshotIds.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "The snapshotIds to acquire kv snapshot lease is empty");
+        }
+
+        return gateway.acquireKvSnapshotLease(
+                        makeAcquireKvSnapshotLeaseRequest(leaseId, 
snapshotIds, leaseDuration))
+                
.thenApply(ClientRpcMessageUtils::toAcquireKvSnapshotLeaseResult);
+    }
+
+    @Override
+    public CompletableFuture<Void> releaseKvSnapshotLease(
+            String leaseId, Set<TableBucket> bucketsToRelease) {
+        return gateway.releaseKvSnapshotLease(
+                        makeReleaseKvSnapshotLeaseRequest(leaseId, 
bucketsToRelease))
+                .thenApply(r -> null);
+    }
+
+    @Override
+    public CompletableFuture<Void> dropKvSnapshotLease(String leaseId) {

Review Comment:
   rename to releaseAllKvSnapshotLeases()



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+    private static final AtomicInteger ZERO = new AtomicInteger(0);
+
+    private final KvSnapshotLeaseMetadataManager metadataHelper;
+    private final CoordinatorContext coordinatorContext;
+    private final Clock clock;
+    private final ScheduledExecutorService scheduledExecutor;
+    private final Configuration conf;
+
+    private final Map<String, ReadWriteLock> leaseLocks = 
MapUtils.newConcurrentHashMap();
+    /** lease id to kv snapshot lease. */
+    @GuardedBy("leaseLocks")
+    private final Map<String, KvSnapshotLease> kvSnapshotLeaseMap;
+
+    private final ReadWriteLock refCountLock = new ReentrantReadWriteLock();
+
+    /**
+     * KvSnapshotLeaseForBucket to the ref count, which means this table 
bucket + snapshotId has
+     * been leased by how many lease id.
+     */
+    private final Map<KvSnapshotLeaseForBucket, AtomicInteger> refCount =
+            MapUtils.newConcurrentHashMap();
+
+    /** For metrics. */
+    private final AtomicInteger leasedBucketCount = new AtomicInteger(0);
+
+    public KvSnapshotLeaseManager(
+            Configuration conf,
+            KvSnapshotLeaseMetadataManager metadataHelper,
+            CoordinatorContext coordinatorContext,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this(
+                conf,
+                metadataHelper,
+                coordinatorContext,
+                Executors.newScheduledThreadPool(
+                        1, new 
ExecutorThreadFactory("kv-snapshot-lease-cleaner")),
+                clock,
+                coordinatorMetricGroup);
+    }
+
+    @VisibleForTesting
+    public KvSnapshotLeaseManager(
+            Configuration conf,
+            KvSnapshotLeaseMetadataManager metadataHelper,
+            CoordinatorContext coordinatorContext,
+            ScheduledExecutorService scheduledExecutor,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this.metadataHelper = metadataHelper;
+        this.conf = conf;
+        this.scheduledExecutor = scheduledExecutor;
+        this.coordinatorContext = coordinatorContext;
+        this.clock = clock;
+        this.kvSnapshotLeaseMap = MapUtils.newConcurrentHashMap();
+
+        registerMetrics(coordinatorMetricGroup);
+    }
+
+    public void start() {
+        LOG.info("kv snapshot lease manager has been started.");
+        scheduledExecutor.scheduleWithFixedDelay(
+                this::expireLeases,
+                0L,
+                
conf.get(ConfigOptions.KV_SNAPSHOT_LEASE_EXPIRATION_CHECK_INTERVAL).toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    public void initialize() throws Exception {
+        for (String leaseId : metadataHelper.getLeasesList()) {
+            Optional<KvSnapshotLease> kvSnapshotLeaseOpt = 
metadataHelper.getLease(leaseId);

Review Comment:
   In high-availability mode, can this initialization model ensure distributed 
concurrency safety?
   



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+    private static final AtomicInteger ZERO = new AtomicInteger(0);

Review Comment:
   It's very odd for an atomic type to be used as a constant.
   
   



##########
fluss-server/src/test/java/org/apache/fluss/server/zk/data/lease/KvSnapshotLeaseMetadataManagerTest.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lease;
+
+import org.apache.fluss.fs.FSDataInputStream;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.server.zk.NOPErrorHandler;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.ZooKeeperExtension;
+import org.apache.fluss.testutils.common.AllCallbackWrapper;
+import org.apache.fluss.utils.IOUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link KvSnapshotLeaseMetadataManager}. */
+public class KvSnapshotLeaseMetadataManagerTest {
+
+    @RegisterExtension
+    public static final AllCallbackWrapper<ZooKeeperExtension> 
ZOO_KEEPER_EXTENSION_WRAPPER =
+            new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+    protected static ZooKeeperClient zookeeperClient;
+    private @TempDir Path tempDir;
+    private KvSnapshotLeaseMetadataManager metadataHelper;
+
+    @BeforeAll
+    static void beforeAll() {
+        zookeeperClient =
+                ZOO_KEEPER_EXTENSION_WRAPPER
+                        .getCustomExtension()
+                        .getZooKeeperClient(NOPErrorHandler.INSTANCE);
+    }
+
+    @BeforeEach
+    void beforeEach() {
+        metadataHelper = new KvSnapshotLeaseMetadataManager(zookeeperClient, 
tempDir.toString());
+    }
+
+    @AfterEach
+    void afterEach() {
+        ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupRoot();
+    }
+
+    @Test
+    void testGetLeasesList() throws Exception {
+        List<String> leasesList = metadataHelper.getLeasesList();
+        assertThat(leasesList).isEmpty();
+
+        metadataHelper.registerLease("leaseId1", new KvSnapshotLease(1000L));
+
+        Map<Long, KvSnapshotTableLease> tableIdToTableLease = new HashMap<>();
+        tableIdToTableLease.put(1L, new KvSnapshotTableLease(1L, new Long[] 
{100L, -1L}));
+        metadataHelper.registerLease("leaseId2", new KvSnapshotLease(2000L, 
tableIdToTableLease));
+        leasesList = metadataHelper.getLeasesList();
+        assertThat(leasesList).containsExactlyInAnyOrder("leaseId1", 
"leaseId2");
+    }
+
+    @Test
+    void testRegisterAndUpdateLease() throws Exception {
+        Map<Long, KvSnapshotTableLease> tableIdToTableLease = new HashMap<>();
+        tableIdToTableLease.put(1L, new KvSnapshotTableLease(1L, new Long[] 
{100L, -1L}));
+
+        Map<Long, Long[]> partitionSnapshots = new HashMap<>();
+        partitionSnapshots.put(1000L, new Long[] {111L, 122L});
+        partitionSnapshots.put(1001L, new Long[] {122L, -1L});
+        tableIdToTableLease.put(2L, new KvSnapshotTableLease(2L, 
partitionSnapshots));
+
+        KvSnapshotLease expectedLease = new KvSnapshotLease(1000L, 
tableIdToTableLease);
+        metadataHelper.registerLease("leaseId1", expectedLease);

Review Comment:
   The metaHelper in the code can be uniformly renamed to leaseMetadataManager.
   
   



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lease/KvSnapshotLease.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lease;
+
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.utils.MapUtils;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+
+/** The entity of kv snapshot lease. */
+@NotThreadSafe
+public class KvSnapshotLease {
+    private long expirationTime;
+
+    /** A map from table id to kv snapshot lease for one table. */
+    private final Map<Long, KvSnapshotTableLease> tableIdToTableLease;
+
+    public KvSnapshotLease(long leaseDuration) {
+        this(leaseDuration, MapUtils.newConcurrentHashMap());
+    }
+
+    public KvSnapshotLease(
+            long expirationTime, Map<Long, KvSnapshotTableLease> 
tableIdToTableLease) {

Review Comment:
   expirationTime is a more precise variable and parameter name than duration. 
However, the codebase extensively mixes both duration and expirationTime; 
please standardize them.
   
   



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/DropKvSnapshotLeaseProcedure.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/** Procedure to release kv snapshot lease. */
+public class DropKvSnapshotLeaseProcedure extends ProcedureBase {
+
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(name = "leaseId", type = 
@DataTypeHint("STRING")),
+            })
+    public String[] call(ProcedureContext context, String leaseId) throws 
Exception {

Review Comment:
   Add a description in the procedures.md document.
   
   



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lease/KvSnapshotLease.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lease;
+
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.utils.MapUtils;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+
+/** The entity of kv snapshot lease. */
+@NotThreadSafe
+public class KvSnapshotLease {
+    private long expirationTime;
+
+    /** A map from table id to kv snapshot lease for one table. */
+    private final Map<Long, KvSnapshotTableLease> tableIdToTableLease;
+
+    public KvSnapshotLease(long leaseDuration) {
+        this(leaseDuration, MapUtils.newConcurrentHashMap());
+    }
+
+    public KvSnapshotLease(
+            long expirationTime, Map<Long, KvSnapshotTableLease> 
tableIdToTableLease) {

Review Comment:
   See `FlinkConnectorOptions.SCAN_KV_SNAPSHOT_LEASE_DURATION` also.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+    private static final AtomicInteger ZERO = new AtomicInteger(0);
+
+    private final KvSnapshotLeaseMetadataManager metadataHelper;
+    private final CoordinatorContext coordinatorContext;
+    private final Clock clock;
+    private final ScheduledExecutorService scheduledExecutor;
+    private final Configuration conf;
+
+    private final Map<String, ReadWriteLock> leaseLocks = 
MapUtils.newConcurrentHashMap();
+    /** lease id to kv snapshot lease. */
+    @GuardedBy("leaseLocks")
+    private final Map<String, KvSnapshotLease> kvSnapshotLeaseMap;
+
+    private final ReadWriteLock refCountLock = new ReentrantReadWriteLock();
+
+    /**
+     * KvSnapshotLeaseForBucket to the ref count, which means this table 
bucket + snapshotId has
+     * been leased by how many lease id.
+     */
+    private final Map<KvSnapshotLeaseForBucket, AtomicInteger> refCount =
+            MapUtils.newConcurrentHashMap();
+
+    /** For metrics. */
+    private final AtomicInteger leasedBucketCount = new AtomicInteger(0);
+
+    public KvSnapshotLeaseManager(
+            Configuration conf,
+            KvSnapshotLeaseMetadataManager metadataHelper,
+            CoordinatorContext coordinatorContext,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this(
+                conf,
+                metadataHelper,
+                coordinatorContext,
+                Executors.newScheduledThreadPool(
+                        1, new 
ExecutorThreadFactory("kv-snapshot-lease-cleaner")),
+                clock,
+                coordinatorMetricGroup);
+    }
+
+    @VisibleForTesting
+    public KvSnapshotLeaseManager(
+            Configuration conf,
+            KvSnapshotLeaseMetadataManager metadataHelper,
+            CoordinatorContext coordinatorContext,
+            ScheduledExecutorService scheduledExecutor,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this.metadataHelper = metadataHelper;
+        this.conf = conf;
+        this.scheduledExecutor = scheduledExecutor;
+        this.coordinatorContext = coordinatorContext;
+        this.clock = clock;
+        this.kvSnapshotLeaseMap = MapUtils.newConcurrentHashMap();
+
+        registerMetrics(coordinatorMetricGroup);
+    }
+
+    public void start() {
+        LOG.info("kv snapshot lease manager has been started.");
+        scheduledExecutor.scheduleWithFixedDelay(
+                this::expireLeases,
+                0L,
+                
conf.get(ConfigOptions.KV_SNAPSHOT_LEASE_EXPIRATION_CHECK_INTERVAL).toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    public void initialize() throws Exception {
+        for (String leaseId : metadataHelper.getLeasesList()) {
+            Optional<KvSnapshotLease> kvSnapshotLeaseOpt = 
metadataHelper.getLease(leaseId);
+            if (kvSnapshotLeaseOpt.isPresent()) {
+                KvSnapshotLease kvSnapshotLease = kvSnapshotLeaseOpt.get();
+                this.leaseLocks.put(leaseId, new ReentrantReadWriteLock());
+                this.kvSnapshotLeaseMap.put(leaseId, kvSnapshotLease);
+
+                initializeRefCount(kvSnapshotLease);
+
+                
leasedBucketCount.addAndGet(kvSnapshotLease.getLeasedSnapshotCount());
+            }
+        }
+    }
+
+    public boolean snapshotLeaseNotExist(KvSnapshotLeaseForBucket 
kvSnapshotLeaseForBucket) {
+        return inReadLock(
+                refCountLock,
+                () ->
+                        refCount.getOrDefault(kvSnapshotLeaseForBucket, new 
AtomicInteger(0)).get()
+                                <= 0);
+    }
+
+    /**
+     * Acquire kv snapshot lease.
+     *
+     * @param leaseId the lease id
+     * @param leaseDuration the lease duration
+     * @param tableIdToLeaseBucket the table id to lease bucket
+     * @return the map of unavailable snapshots that failed to be leased
+     */
+    public Map<TableBucket, Long> acquireLease(
+            String leaseId,
+            long leaseDuration,
+            Map<Long, List<KvSnapshotLeaseForBucket>> tableIdToLeaseBucket)
+            throws Exception {
+        ReadWriteLock lock = leaseLocks.computeIfAbsent(leaseId, k -> new 
ReentrantReadWriteLock());
+        return inWriteLock(
+                lock,
+                () -> {
+                    // To record the unavailable snapshots such as the kv 
snapshotId to lease not
+                    // exists.
+                    Map<TableBucket, Long> unavailableSnapshots = new 
HashMap<>();
+
+                    boolean update = kvSnapshotLeaseMap.containsKey(leaseId);
+                    KvSnapshotLease kvSnapshotLease;
+                    long newLeaseDuration = clock.milliseconds() + 
leaseDuration;
+                    if (!update) {
+                        // set the expiration time as: current time + 
leaseDuration
+                        kvSnapshotLease = new 
KvSnapshotLease(newLeaseDuration);
+                        kvSnapshotLeaseMap.put(leaseId, kvSnapshotLease);
+                        LOG.info(
+                                "kv snapshot lease '"
+                                        + leaseId
+                                        + "' has been acquired. The lease 
expiration time is "
+                                        + kvSnapshotLease.getExpirationTime());
+                    } else {
+                        kvSnapshotLease = kvSnapshotLeaseMap.get(leaseId);
+                        kvSnapshotLease.setExpirationTime(newLeaseDuration);
+                    }
+
+                    for (Map.Entry<Long, List<KvSnapshotLeaseForBucket>> entry 
:
+                            tableIdToLeaseBucket.entrySet()) {
+                        Long tableId = entry.getKey();
+                        TableInfo tableInfo = 
coordinatorContext.getTableInfoById(tableId);
+                        int numBuckets = tableInfo.getNumBuckets();
+                        List<KvSnapshotLeaseForBucket> buckets = 
entry.getValue();
+                        for (KvSnapshotLeaseForBucket bucket : buckets) {
+
+                            TableBucket tableBucket = bucket.getTableBucket();
+                            long kvSnapshotId = bucket.getKvSnapshotId();
+                            try {
+                                boolean snapshotExists =
+                                        
metadataHelper.isSnapshotExists(tableBucket, kvSnapshotId);
+                                if (!snapshotExists) {
+                                    unavailableSnapshots.put(tableBucket, 
kvSnapshotId);
+                                    continue;
+                                }
+                            } catch (Exception e) {
+                                LOG.error(
+                                        "Failed to check snapshotExists for 
tableBucket when acquire kv "
+                                                + "snapshot kvSnapshotLease 
{}.",
+                                        tableBucket,
+                                        e);
+                                unavailableSnapshots.put(tableBucket, 
kvSnapshotId);
+                                continue;
+                            }
+
+                            long originalSnapshotId =
+                                    kvSnapshotLease.acquireBucket(
+                                            tableBucket, kvSnapshotId, 
numBuckets);
+                            if (originalSnapshotId == -1L) {
+                                leasedBucketCount.incrementAndGet();
+                            } else {
+                                // clear the original ref.
+                                decrementRefCount(
+                                        new KvSnapshotLeaseForBucket(
+                                                tableBucket, 
originalSnapshotId));
+                            }
+                            incrementRefCount(bucket);
+                        }
+                    }
+
+                    if (update) {
+                        metadataHelper.updateLease(leaseId, kvSnapshotLease);
+                    } else {
+                        metadataHelper.registerLease(leaseId, kvSnapshotLease);
+                    }
+
+                    return unavailableSnapshots;
+                });
+    }
+
+    public void release(String leaseId, Map<Long, List<TableBucket>> 
tableIdToUnregisterBucket)
+            throws Exception {
+        ReadWriteLock lock = leaseLocks.get(leaseId);
+        if (lock == null) {
+            return;
+        }
+
+        inWriteLock(
+                lock,
+                () -> {
+                    KvSnapshotLease lease = kvSnapshotLeaseMap.get(leaseId);
+                    if (lease == null) {
+                        return;
+                    }
+
+                    for (Map.Entry<Long, List<TableBucket>> entry :
+                            tableIdToUnregisterBucket.entrySet()) {
+                        List<TableBucket> buckets = entry.getValue();
+                        for (TableBucket bucket : buckets) {
+                            long snapshotId = lease.releaseBucket(bucket);
+                            if (snapshotId != -1L) {
+                                leasedBucketCount.decrementAndGet();
+                                decrementRefCount(new 
KvSnapshotLeaseForBucket(bucket, snapshotId));
+                            }
+                        }
+                    }
+
+                    if (lease.isEmpty()) {
+                        drop(leaseId);
+                    } else {
+                        metadataHelper.updateLease(leaseId, lease);
+                    }
+                });
+    }
+
+    /**
+     * Drop kv snapshot lease.
+     *
+     * @param leaseId the lease id
+     * @return true if clear success, false if lease not exist
+     */
+    public boolean drop(String leaseId) throws Exception {
+        ReadWriteLock lock = leaseLocks.get(leaseId);
+        if (lock == null) {
+            return false;
+        }
+
+        boolean exist =
+                inWriteLock(
+                        lock,
+                        () -> {
+                            KvSnapshotLease kvSnapshotLease = 
kvSnapshotLeaseMap.remove(leaseId);
+                            if (kvSnapshotLease == null) {
+                                return false;
+                            }
+
+                            clearRefCount(kvSnapshotLease);
+                            metadataHelper.deleteLease(leaseId);
+
+                            LOG.info("kv snapshot lease '" + leaseId + "' has 
been dropped.");
+                            return true;
+                        });
+
+        leaseLocks.remove(leaseId);
+        return exist;
+    }
+
+    private void initializeRefCount(KvSnapshotLease lease) {
+        for (Map.Entry<Long, KvSnapshotTableLease> tableEntry :
+                lease.getTableIdToTableLease().entrySet()) {
+            long tableId = tableEntry.getKey();
+            KvSnapshotTableLease tableLease = tableEntry.getValue();
+            if (tableLease.getBucketSnapshots() != null) {
+                Long[] snapshots = tableLease.getBucketSnapshots();
+                for (int i = 0; i < snapshots.length; i++) {
+                    if (snapshots[i] == -1L) {
+                        continue;
+                    }
+
+                    incrementRefCount(
+                            new KvSnapshotLeaseForBucket(
+                                    new TableBucket(tableId, i), 
snapshots[i]));
+                }
+            } else {
+                Map<Long, Long[]> partitionSnapshots = 
tableLease.getPartitionSnapshots();
+                for (Map.Entry<Long, Long[]> entry : 
partitionSnapshots.entrySet()) {
+                    Long partitionId = entry.getKey();
+                    Long[] snapshots = entry.getValue();
+                    for (int i = 0; i < snapshots.length; i++) {
+                        if (snapshots[i] == -1L) {
+                            continue;
+                        }
+
+                        incrementRefCount(
+                                new KvSnapshotLeaseForBucket(
+                                        new TableBucket(tableId, partitionId, 
i), snapshots[i]));
+                    }
+                }
+            }
+        }
+    }
+
+    private void clearRefCount(KvSnapshotLease lease) {
+        for (Map.Entry<Long, KvSnapshotTableLease> tableEntry :
+                lease.getTableIdToTableLease().entrySet()) {
+            long tableId = tableEntry.getKey();
+            KvSnapshotTableLease tableLease = tableEntry.getValue();
+            if (tableLease.getBucketSnapshots() != null) {
+                Long[] snapshots = tableLease.getBucketSnapshots();
+                for (int i = 0; i < snapshots.length; i++) {
+                    if (snapshots[i] == -1L) {
+                        continue;
+                    }
+                    decrementRefCount(
+                            new KvSnapshotLeaseForBucket(
+                                    new TableBucket(tableId, i), 
snapshots[i]));
+                    leasedBucketCount.decrementAndGet();
+                }
+            } else {
+                Map<Long, Long[]> partitionSnapshots = 
tableLease.getPartitionSnapshots();
+                for (Map.Entry<Long, Long[]> entry : 
partitionSnapshots.entrySet()) {
+                    Long partitionId = entry.getKey();
+                    Long[] snapshots = entry.getValue();
+                    for (int i = 0; i < snapshots.length; i++) {
+                        if (snapshots[i] == -1L) {
+                            continue;
+                        }
+
+                        decrementRefCount(
+                                new KvSnapshotLeaseForBucket(
+                                        new TableBucket(tableId, partitionId, 
i), snapshots[i]));
+                        leasedBucketCount.decrementAndGet();
+                    }
+                }
+            }
+        }
+    }
+
+    private void incrementRefCount(KvSnapshotLeaseForBucket 
kvSnapshotLeaseForBucket) {
+        inWriteLock(
+                refCountLock,
+                () ->
+                        refCount.computeIfAbsent(
+                                        kvSnapshotLeaseForBucket, k -> new 
AtomicInteger(0))
+                                .incrementAndGet());
+    }
+
+    private void decrementRefCount(KvSnapshotLeaseForBucket 
kvSnapshotLeaseForBucket) {
+        inWriteLock(
+                refCountLock,
+                () -> {
+                    AtomicInteger atomicInteger = 
refCount.get(kvSnapshotLeaseForBucket);
+                    if (atomicInteger != null) {
+                        int decrementAndGet = atomicInteger.decrementAndGet();
+                        if (decrementAndGet <= 0) {
+                            refCount.remove(kvSnapshotLeaseForBucket);
+                        }
+                    }
+                });
+    }
+
+    private void expireLeases() {
+        long currentTime = clock.milliseconds();
+        kvSnapshotLeaseMap.entrySet().stream()
+                .filter(entry -> entry.getValue().getExpirationTime() < 
currentTime)
+                .map(Map.Entry::getKey)
+                .forEach(
+                        leaseId -> {
+                            try {
+                                drop(leaseId);

Review Comment:
   Although ConcurrentHashMap is used, calling drop to modify the map during 
iteration may lead to inconsistent behavior.
   Recommendation: first collect the lease IDs that need to be removed, then 
delete them in bulk.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -132,6 +140,13 @@ public class FlinkSourceEnumerator
     private final OffsetsInitializer startingOffsetsInitializer;
     private final OffsetsInitializer stoppingOffsetsInitializer;
 
+    private final LeaseContext leaseContext;
+
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    /** checkpointId -> tableBuckets who finished consume kv snapshots. */
+    @GuardedBy("lock")
+    private final TreeMap<Long, Set<TableBucket>> consumedKvSnapshotMap = new 
TreeMap<>();

Review Comment:
   Is there a concurrency safety issue here? Why is a read-write lock needed?
   
   



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/DropKvSnapshotLeaseProcedure.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/** Procedure to release kv snapshot lease. */
+public class DropKvSnapshotLeaseProcedure extends ProcedureBase {

Review Comment:
   It's recommended to improve the comments and include usage examples.
   
   



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+    private static final AtomicInteger ZERO = new AtomicInteger(0);
+
+    private final KvSnapshotLeaseMetadataManager metadataHelper;
+    private final CoordinatorContext coordinatorContext;

Review Comment:
   You've annotated KvSnapshotLeaseManager with @ThreadSafe, which implies that 
it may be used in a multi-threaded environment and makes no guarantee that it 
shares a thread with CoordinatorEventProcessor. Under this assumption, directly 
accessing CoordinatorContext—which is marked as @NotThreadSafe—introduces a 
thread-safety model violation.
   
   



##########
website/docs/maintenance/configuration.md:
##########
@@ -157,7 +158,7 @@ during the Fluss cluster working.
 | kv.rocksdb.use-bloom-filter                       | Boolean    | true        
                  | If true, every newly created SST file will contain a Bloom 
filter. It is enabled by default.                                               
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                       
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                   |
 | kv.rocksdb.bloom-filter.bits-per-key              | Double     | 10.0        
                  | Bits per key that bloom filter will use, this only take 
effect when bloom filter is used. The default value is 10.0.                    
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                   |
 | kv.rocksdb.bloom-filter.block-based-mode          | Boolean    | false       
                  | If true, RocksDB will use block-based filter instead of 
full filter, this only take effect when bloom filter is used. The default value 
is `false`.                                                                     
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                   |
-| kv.rocksdb.shared-rate-limiter-bytes-per-sec              | MemorySize | 
Long.MAX_VALUE                | The bytes per second rate limit for RocksDB 
flush and compaction operations shared across all RocksDB instances on the 
TabletServer. The rate limiter is always enabled. The default value is 
Long.MAX_VALUE (effectively unlimited). Set to a lower value (e.g., 100MB) to 
limit the rate. This configuration can be updated dynamically without server 
restart. See [Updating Configs](operations/updating-configs.md) for more 
details.                                                                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                          |
+| kv.rocksdb.shared-rate-limiter-bytes-per-sec      | MemorySize | 
Long.MAX_VALUE                | The bytes per second rate limit for RocksDB 
flush and compaction operations shared across all RocksDB instances on the 
TabletServer. The rate limiter is always enabled. The default value is 
Long.MAX_VALUE (effectively unlimited). Set to a lower value (e.g., 100MB) to 
limit the rate. This configuration can be updated dynamically without server 
restart. See [Updating Configs](operations/updating-configs.md) for more 
details.                                                                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                   |

Review Comment:
   Clean up unrelated changes.
   
   



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+    private static final AtomicInteger ZERO = new AtomicInteger(0);
+
+    private final KvSnapshotLeaseMetadataManager metadataHelper;

Review Comment:
   Rename to metadataManager



##########
website/docs/maintenance/observability/monitor-metrics.md:
##########
@@ -325,6 +325,16 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
       <td>The total number of partitions in this cluster.</td>
       <td>Gauge</td>
     </tr>
+    <tr>
+      <td>kvSnapshotLeaseCount</td>
+      <td>The total number of kv snapshots in this cluster.</td>

Review Comment:
   The metric name does not match its description.
   
   



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -531,22 +543,96 @@ private List<SourceSplitBase> 
initPrimaryKeyTablePartitionSplits(
         List<SourceSplitBase> splits = new ArrayList<>();
         for (Partition partition : newPartitions) {
             String partitionName = partition.getPartitionName();
-            // get the table snapshot info
-            final KvSnapshots kvSnapshots;
-            try {
-                kvSnapshots = flussAdmin.getLatestKvSnapshots(tablePath, 
partitionName).get();
-            } catch (Exception e) {
-                throw new FlinkRuntimeException(
-                        String.format(
-                                "Failed to get table snapshot for table %s and 
partition %s",
-                                tablePath, partitionName),
-                        ExceptionUtils.stripCompletionException(e));
-            }
-            splits.addAll(getSnapshotAndLogSplits(kvSnapshots, partitionName));
+            splits.addAll(
+                    getSnapshotAndLogSplits(
+                            getLatestKvSnapshotsAndRegister(partitionName), 
partitionName));
         }
         return splits;
     }
 
+    private KvSnapshots getLatestKvSnapshotsAndRegister(@Nullable String 
partitionName) {
+        long tableId;
+        Long partitionId;
+        Map<Integer, Long> snapshotIds = new HashMap<>();
+        Map<Integer, Long> logOffsets = new HashMap<>();
+
+        // retry to get the latest kv snapshots and acquire kvSnapshot lease 
util all buckets
+        // acquire success. The reason is that getLatestKvSnapshots and 
acquireKvSnapshotLease
+        // are not atomic operations, the latest kv snapshot obtained via get 
may become outdated by
+        // the time it is passed to acquire. Therefore, this logic must 
implement a retry
+        // mechanism: the unavailable tableBuckets in the 
AcquiredKvSnapshotLeaseResult returned by
+        // acquireKvSnapshotLease must be retried repeatedly until all buckets 
are successfully
+        // acquired.
+        try {
+            Set<TableBucket> remainingTableBuckets;
+            do {
+                KvSnapshots kvSnapshots = getLatestKvSnapshots(partitionName);
+                remainingTableBuckets = new 
HashSet<>(kvSnapshots.getTableBuckets());
+
+                tableId = kvSnapshots.getTableId();
+                partitionId = kvSnapshots.getPartitionId();
+
+                Set<TableBucket> ignoreBuckets = new HashSet<>();
+                Map<TableBucket, Long> bucketsToLease = new HashMap<>();
+                for (TableBucket tb : remainingTableBuckets) {
+                    int bucket = tb.getBucket();
+                    OptionalLong snapshotIdOpt = 
kvSnapshots.getSnapshotId(bucket);
+                    OptionalLong logOffsetOpt = 
kvSnapshots.getLogOffset(bucket);
+                    if (snapshotIdOpt.isPresent() && !ignoreTableBucket(tb)) {
+                        bucketsToLease.put(tb, snapshotIdOpt.getAsLong());
+                    } else {
+                        ignoreBuckets.add(tb);
+                    }
+
+                    snapshotIds.put(
+                            bucket, snapshotIdOpt.isPresent() ? 
snapshotIdOpt.getAsLong() : null);
+                    logOffsets.put(
+                            bucket, logOffsetOpt.isPresent() ? 
logOffsetOpt.getAsLong() : null);
+                }
+
+                if (!ignoreBuckets.isEmpty()) {
+                    remainingTableBuckets.removeAll(ignoreBuckets);
+                }
+
+                if (!bucketsToLease.isEmpty()) {
+                    String kvSnapshotLeaseId = 
leaseContext.getKvSnapshotLeaseId();
+                    LOG.info(
+                            "Try to acquire kv snapshot lease {} for table {}",
+                            kvSnapshotLeaseId,
+                            tablePath);
+                    remainingTableBuckets =
+                            flussAdmin
+                                    .acquireKvSnapshotLease(
+                                            kvSnapshotLeaseId,
+                                            bucketsToLease,
+                                            
leaseContext.getKvSnapshotLeaseDurationMs())

Review Comment:
   acquireKvSnapshotLease() may throw NullPointerException when [null == 
easeContext.getKvSnapshotLeaseDurationMs()]



##########
website/docs/engine-flink/options.md:
##########
@@ -91,20 +91,22 @@ See more details about [ALTER TABLE ... 
SET](engine-flink/ddl.md#set-properties)
 
 ## Read Options
 
-| Option                                              | Type       | Default   
                                      | Description                             
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
           |
-|-----------------------------------------------------|------------|-------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| scan.startup.mode                                   | Enum       | full      
                                      | The scan startup mode enables you to 
specify the starting point for data consumption. Fluss currently supports the 
following `scan.startup.mode` options: `full` (default), earliest, latest, 
timestamp. See the [Start Reading 
Position](engine-flink/reads.md#start-reading-position) for more details.       
                                                                                
                                                                                
                                                                   |
-| scan.startup.timestamp                              | Long       | (None)    
                                      | The timestamp to start reading the data 
from. This option is only valid when `scan.startup.mode` is set to `timestamp`. 
The format is 'milli-second-since-epoch' or `yyyy-MM-dd HH:mm:ss`, like 
`1678883047356` or `2023-12-09 23:09:12`.                                       
                                                                                
                                                                                
                                                                                
                   |
-| scan.partition.discovery.interval                   | Duration   | 1min      
                                      | The time interval for the Fluss source 
to discover the new partitions for partitioned table while scanning. A 
non-positive value disables the partition discovery. The default value is 1 
minute. Currently, since Fluss Admin#listPartitions(TablePath tablePath) 
requires a large number of requests to ZooKeeper in server, this option cannot 
be set too small, as a small value would cause frequent requests and increase 
server load. In the future, once list partitions is optimized, the default 
value of this parameter can be reduced. |
-| client.scanner.log.check-crc                        | Boolean    | true      
                                      | Automatically check the CRC3 of the 
read records for LogScanner. This ensures no on-the-wire or on-disk corruption 
to the messages occurred. This check adds some overhead, so it may be disabled 
in cases seeking extreme performance.                                           
                                                                                
                                                                                
                                                                                
                 |
-| client.scanner.log.max-poll-records                 | Integer    | 500       
                                      | The maximum number of records returned 
in a single call to poll() for LogScanner. Note that this config doesn't impact 
the underlying fetching behavior. The Scanner will cache the records from each 
fetch request and returns them incrementally from each poll.                    
                                                                                
                                                                                
                                                                                
             |
-| client.scanner.log.fetch.max-bytes                  | MemorySize | 16mb      
                                      | The maximum amount of data the server 
should return for a fetch request from client. Records are fetched in batches, 
and if the first record batch in the first non-empty bucket of the fetch is 
larger than this value, the record batch will still be returned to ensure that 
the fetch can make progress. As such, this is not a absolute maximum.           
                                                                                
                                                                                
                   |
-| client.scanner.log.fetch.max-bytes-for-bucket       | MemorySize | 1mb       
                                      | The maximum amount of data the server 
should return for a table bucket in fetch request fom client. Records are 
fetched in batches, and the max bytes size is config by this option.            
                                                                                
                                                                                
                                                                                
                                                                                
                   |
-| client.scanner.log.fetch.min-bytes                  | MemorySize | 1b        
                                      | The minimum bytes expected for each 
fetch log request from client to response. If not enough bytes, wait up to 
client.scanner.log.fetch-wait-max-time time to return.                          
                                                                                
                                                                                
                                                                                
                                                                                
                    |
-| client.scanner.log.fetch.wait-max-time              | Duration   | 500ms     
                                      | The maximum time to wait for enough 
bytes to be available for a fetch log request from client to response.          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
               |
-| client.scanner.io.tmpdir                            | String     | 
System.getProperty("java.io.tmpdir") + "/fluss" | Local directory that is used 
by client for storing the data files (like kv snapshot, log segment files) to 
read temporarily                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                        |
-| client.scanner.remote-log.prefetch-num              | Integer    | 4         
                                      | The number of remote log segments to 
keep in local temp file for LogScanner, which download from remote storage. The 
default setting is 4.                                                           
                                                                                
                                                                                
                                                                                
                                                                                
              |
-| client.remote-file.download-thread-num              | Integer    | 3         
                                      | The number of threads the client uses 
to download remote files.                                                       
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
             |
+| Option                                        | Type       | Default         
                                | Description                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
     |
+|-----------------------------------------------|------------|-------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| scan.startup.mode                             | Enum       | full            
                                | The scan startup mode enables you to specify 
the starting point for data consumption. Fluss currently supports the following 
`scan.startup.mode` options: `full` (default), earliest, latest, timestamp. See 
the [Start Reading Position](engine-flink/reads.md#start-reading-position) for 
more details.                                                                   
                                                                                
                                                                                
       |
+| scan.startup.timestamp                        | Long       | (None)          
                                | The timestamp to start reading the data from. 
This option is only valid when `scan.startup.mode` is set to `timestamp`. The 
format is 'milli-second-since-epoch' or `yyyy-MM-dd HH:mm:ss`, like 
`1678883047356` or `2023-12-09 23:09:12`.                                       
                                                                                
                                                                                
                                                                                
                   |
+| scan.partition.discovery.interval             | Duration   | 1min            
                                | The time interval for the Fluss source to 
discover the new partitions for partitioned table while scanning. A 
non-positive value disables the partition discovery. The default value is 1 
minute. Currently, since Fluss Admin#listPartitions(TablePath tablePath) 
requires a large number of requests to ZooKeeper in server, this option cannot 
be set too small, as a small value would cause frequent requests and increase 
server load. In the future, once list partitions is optimized, the default 
value of this parameter can be reduced. |
+| scan.kv.snapshot.lease.id                     | String     | UUID            
                                | The lease id to lease kv snapshots. If set, 
the acquired kv snapshots will not be deleted until the consumer finished 
consuming all the snapshots or the lease duration time is reached. If not set, 
an UUID will be set.                                                            
                                                                                
                                                                                
                                                                                
              |
+| scan.kv.snapshot.lease.duration               | Duration   | 1day            
                                | The time period how long to wait before 
expiring the kv snapshot lease to avoid kv snapshot blocking to delete.         
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
           |
+| client.scanner.log.check-crc                  | Boolean    | true            
                                | Automatically check the CRC3 of the read 
records for LogScanner. This ensures no on-the-wire or on-disk corruption to 
the messages occurred. This check adds some overhead, so it may be disabled in 
cases seeking extreme performance.                                              
                                                                                
                                                                                
                                                                                
              |
+| client.scanner.log.max-poll-records           | Integer    | 500             
                                | The maximum number of records returned in a 
single call to poll() for LogScanner. Note that this config doesn't impact the 
underlying fetching behavior. The Scanner will cache the records from each 
fetch request and returns them incrementally from each poll.                    
                                                                                
                                                                                
                                                                                
             |
+| client.scanner.log.fetch.max-bytes            | MemorySize | 16mb            
                                | The maximum amount of data the server should 
return for a fetch request from client. Records are fetched in batches, and if 
the first record batch in the first non-empty bucket of the fetch is larger 
than this value, the record batch will still be returned to ensure that the 
fetch can make progress. As such, this is not a absolute maximum.               
                                                                                
                                                                                
               |
+| client.scanner.log.fetch.max-bytes-for-bucket | MemorySize | 1mb             
                                | The maximum amount of data the server should 
return for a table bucket in fetch request fom client. Records are fetched in 
batches, and the max bytes size is config by this option.                       
                                                                                
                                                                                
                                                                                
                                                                                
        |
+| client.scanner.log.fetch.min-bytes            | MemorySize | 1b              
                                | The minimum bytes expected for each fetch log 
request from client to response. If not enough bytes, wait up to 
client.scanner.log.fetch-wait-max-time time to return.                          
                                                                                
                                                                                
                                                                                
                                                                                
                    |
+| client.scanner.log.fetch.wait-max-time        | Duration   | 500ms           
                                | The maximum time to wait for enough bytes to 
be available for a fetch log request from client to response.                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
      |
+| client.scanner.io.tmpdir                      | String     | 
System.getProperty("java.io.tmpdir") + "/fluss" | Local directory that is used 
by client for storing the data files (like kv snapshot, log segment files) to 
read temporarily                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                        |
+| client.scanner.remote-log.prefetch-num        | Integer    | 4               
                                | The number of remote log segments to keep in 
local temp file for LogScanner, which download from remote storage. The default 
setting is 4.                                                                   
                                                                                
                                                                                
                                                                                
                                                                                
      |

Review Comment:
   Clean up unrelated changes.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+    private static final AtomicInteger ZERO = new AtomicInteger(0);
+
+    private final KvSnapshotLeaseMetadataManager metadataHelper;
+    private final CoordinatorContext coordinatorContext;
+    private final Clock clock;
+    private final ScheduledExecutorService scheduledExecutor;
+    private final Configuration conf;
+
+    private final Map<String, ReadWriteLock> leaseLocks = 
MapUtils.newConcurrentHashMap();
+    /** lease id to kv snapshot lease. */
+    @GuardedBy("leaseLocks")
+    private final Map<String, KvSnapshotLease> kvSnapshotLeaseMap;
+
+    private final ReadWriteLock refCountLock = new ReentrantReadWriteLock();
+
+    /**
+     * KvSnapshotLeaseForBucket to the ref count, which means this table 
bucket + snapshotId has
+     * been leased by how many lease id.
+     */
+    private final Map<KvSnapshotLeaseForBucket, AtomicInteger> refCount =
+            MapUtils.newConcurrentHashMap();
+
+    /** For metrics. */
+    private final AtomicInteger leasedBucketCount = new AtomicInteger(0);
+
+    public KvSnapshotLeaseManager(
+            Configuration conf,
+            KvSnapshotLeaseMetadataManager metadataHelper,
+            CoordinatorContext coordinatorContext,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this(
+                conf,
+                metadataHelper,
+                coordinatorContext,
+                Executors.newScheduledThreadPool(
+                        1, new 
ExecutorThreadFactory("kv-snapshot-lease-cleaner")),
+                clock,
+                coordinatorMetricGroup);
+    }
+
+    @VisibleForTesting
+    public KvSnapshotLeaseManager(
+            Configuration conf,
+            KvSnapshotLeaseMetadataManager metadataHelper,
+            CoordinatorContext coordinatorContext,
+            ScheduledExecutorService scheduledExecutor,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this.metadataHelper = metadataHelper;
+        this.conf = conf;
+        this.scheduledExecutor = scheduledExecutor;
+        this.coordinatorContext = coordinatorContext;
+        this.clock = clock;
+        this.kvSnapshotLeaseMap = MapUtils.newConcurrentHashMap();
+
+        registerMetrics(coordinatorMetricGroup);
+    }
+
+    public void start() {
+        LOG.info("kv snapshot lease manager has been started.");
+        scheduledExecutor.scheduleWithFixedDelay(
+                this::expireLeases,
+                0L,
+                
conf.get(ConfigOptions.KV_SNAPSHOT_LEASE_EXPIRATION_CHECK_INTERVAL).toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    public void initialize() throws Exception {
+        for (String leaseId : metadataHelper.getLeasesList()) {
+            Optional<KvSnapshotLease> kvSnapshotLeaseOpt = 
metadataHelper.getLease(leaseId);
+            if (kvSnapshotLeaseOpt.isPresent()) {
+                KvSnapshotLease kvSnapshotLease = kvSnapshotLeaseOpt.get();
+                this.leaseLocks.put(leaseId, new ReentrantReadWriteLock());
+                this.kvSnapshotLeaseMap.put(leaseId, kvSnapshotLease);
+
+                initializeRefCount(kvSnapshotLease);
+
+                
leasedBucketCount.addAndGet(kvSnapshotLease.getLeasedSnapshotCount());
+            }
+        }
+    }
+
+    public boolean snapshotLeaseNotExist(KvSnapshotLeaseForBucket 
kvSnapshotLeaseForBucket) {
+        return inReadLock(
+                refCountLock,
+                () ->
+                        refCount.getOrDefault(kvSnapshotLeaseForBucket, new 
AtomicInteger(0)).get()
+                                <= 0);
+    }
+
+    /**
+     * Acquire kv snapshot lease.
+     *
+     * @param leaseId the lease id
+     * @param leaseDuration the lease duration
+     * @param tableIdToLeaseBucket the table id to lease bucket
+     * @return the map of unavailable snapshots that failed to be leased
+     */
+    public Map<TableBucket, Long> acquireLease(
+            String leaseId,
+            long leaseDuration,
+            Map<Long, List<KvSnapshotLeaseForBucket>> tableIdToLeaseBucket)
+            throws Exception {
+        ReadWriteLock lock = leaseLocks.computeIfAbsent(leaseId, k -> new 
ReentrantReadWriteLock());
+        return inWriteLock(
+                lock,
+                () -> {
+                    // To record the unavailable snapshots such as the kv 
snapshotId to lease not
+                    // exists.
+                    Map<TableBucket, Long> unavailableSnapshots = new 
HashMap<>();
+
+                    boolean update = kvSnapshotLeaseMap.containsKey(leaseId);
+                    KvSnapshotLease kvSnapshotLease;
+                    long newLeaseDuration = clock.milliseconds() + 
leaseDuration;
+                    if (!update) {
+                        // set the expiration time as: current time + 
leaseDuration
+                        kvSnapshotLease = new 
KvSnapshotLease(newLeaseDuration);
+                        kvSnapshotLeaseMap.put(leaseId, kvSnapshotLease);
+                        LOG.info(
+                                "kv snapshot lease '"
+                                        + leaseId
+                                        + "' has been acquired. The lease 
expiration time is "
+                                        + kvSnapshotLease.getExpirationTime());
+                    } else {
+                        kvSnapshotLease = kvSnapshotLeaseMap.get(leaseId);

Review Comment:
   Using containsKey() followed by get() is a bad pattern, as it entails 
querying the map multiple times. Moreover, this approach is overly complex and 
can be simplified using computeIfAbsent().
   
   



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+    private static final AtomicInteger ZERO = new AtomicInteger(0);
+
+    private final KvSnapshotLeaseMetadataManager metadataHelper;
+    private final CoordinatorContext coordinatorContext;
+    private final Clock clock;
+    private final ScheduledExecutorService scheduledExecutor;
+    private final Configuration conf;
+
+    private final Map<String, ReadWriteLock> leaseLocks = 
MapUtils.newConcurrentHashMap();
+    /** lease id to kv snapshot lease. */
+    @GuardedBy("leaseLocks")
+    private final Map<String, KvSnapshotLease> kvSnapshotLeaseMap;
+
+    private final ReadWriteLock refCountLock = new ReentrantReadWriteLock();
+
+    /**
+     * KvSnapshotLeaseForBucket to the ref count, which means this table 
bucket + snapshotId has
+     * been leased by how many lease id.
+     */
+    private final Map<KvSnapshotLeaseForBucket, AtomicInteger> refCount =
+            MapUtils.newConcurrentHashMap();
+
+    /** For metrics. */
+    private final AtomicInteger leasedBucketCount = new AtomicInteger(0);
+
+    public KvSnapshotLeaseManager(
+            Configuration conf,
+            KvSnapshotLeaseMetadataManager metadataHelper,
+            CoordinatorContext coordinatorContext,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this(
+                conf,
+                metadataHelper,
+                coordinatorContext,
+                Executors.newScheduledThreadPool(
+                        1, new 
ExecutorThreadFactory("kv-snapshot-lease-cleaner")),
+                clock,
+                coordinatorMetricGroup);
+    }
+
+    @VisibleForTesting
+    public KvSnapshotLeaseManager(
+            Configuration conf,
+            KvSnapshotLeaseMetadataManager metadataHelper,
+            CoordinatorContext coordinatorContext,
+            ScheduledExecutorService scheduledExecutor,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this.metadataHelper = metadataHelper;
+        this.conf = conf;
+        this.scheduledExecutor = scheduledExecutor;
+        this.coordinatorContext = coordinatorContext;
+        this.clock = clock;
+        this.kvSnapshotLeaseMap = MapUtils.newConcurrentHashMap();
+
+        registerMetrics(coordinatorMetricGroup);
+    }
+
+    public void start() {
+        LOG.info("kv snapshot lease manager has been started.");
+        scheduledExecutor.scheduleWithFixedDelay(
+                this::expireLeases,
+                0L,
+                
conf.get(ConfigOptions.KV_SNAPSHOT_LEASE_EXPIRATION_CHECK_INTERVAL).toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    public void initialize() throws Exception {
+        for (String leaseId : metadataHelper.getLeasesList()) {
+            Optional<KvSnapshotLease> kvSnapshotLeaseOpt = 
metadataHelper.getLease(leaseId);
+            if (kvSnapshotLeaseOpt.isPresent()) {
+                KvSnapshotLease kvSnapshotLease = kvSnapshotLeaseOpt.get();
+                this.leaseLocks.put(leaseId, new ReentrantReadWriteLock());
+                this.kvSnapshotLeaseMap.put(leaseId, kvSnapshotLease);
+
+                initializeRefCount(kvSnapshotLease);
+
+                
leasedBucketCount.addAndGet(kvSnapshotLease.getLeasedSnapshotCount());
+            }
+        }
+    }
+
+    public boolean snapshotLeaseNotExist(KvSnapshotLeaseForBucket 
kvSnapshotLeaseForBucket) {
+        return inReadLock(
+                refCountLock,
+                () ->
+                        refCount.getOrDefault(kvSnapshotLeaseForBucket, new 
AtomicInteger(0)).get()

Review Comment:
   new AtomicInteger() here is inefficient.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/LeaseContext.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source.reader;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Context for lease. */
+public class LeaseContext implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    // kv snapshot lease id. null for log table.
+    private final @Nullable String kvSnapshotLeaseId;
+
+    // kv snapshot lease duration. null for log table.
+    private final @Nullable Long kvSnapshotLeaseDurationMs;
+
+    public LeaseContext(
+            @Nullable String kvSnapshotLeaseId, @Nullable Long 
kvSnapshotLeaseDurationMs) {
+        this.kvSnapshotLeaseId = kvSnapshotLeaseId;
+        this.kvSnapshotLeaseDurationMs = kvSnapshotLeaseDurationMs;
+    }
+
+    public @Nullable String getKvSnapshotLeaseId() {
+        return kvSnapshotLeaseId;
+    }
+
+    public @Nullable Long getKvSnapshotLeaseDurationMs() {

Review Comment:
   Why allow nullable?



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/DropKvSnapshotLeaseProcedure.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/** Procedure to drop kv snapshot lease. */
+public class DropKvSnapshotLeaseProcedure extends ProcedureBase {

Review Comment:
   I recommend to update the API name. Although it differs slightly from the 
FIP, it should be fine with an explanatory comment.
   
   



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lease/KvSnapshotLeaseMetadataManager.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lease;
+
+import org.apache.fluss.fs.FSDataInputStream;
+import org.apache.fluss.fs.FSDataOutputStream;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.BucketSnapshot;
+import org.apache.fluss.utils.FlussPaths;
+import org.apache.fluss.utils.IOUtils;
+import org.apache.fluss.utils.types.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * The manager to handle {@link KvSnapshotLease} to register/update/delete 
metadata from zk and
+ * remote fs.
+ */
+public class KvSnapshotLeaseMetadataManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KvSnapshotLeaseMetadataManager.class);
+
+    private final ZooKeeperClient zkClient;
+    private final String remoteDataDir;
+
+    public KvSnapshotLeaseMetadataManager(ZooKeeperClient zkClient, String 
remoteDataDir) {
+        this.zkClient = zkClient;
+        this.remoteDataDir = remoteDataDir;
+    }
+
+    public List<String> getLeasesList() throws Exception {
+        return zkClient.getKvSnapshotLeasesList();
+    }
+
+    /**
+     * Register a new kv snapshot lease to zk and remote fs.
+     *
+     * @param leaseId the lease id.
+     * @param lease the kv snapshot lease.
+     */
+    public void registerLease(String leaseId, KvSnapshotLease lease) throws 
Exception {
+        Map<Long, FsPath> tableIdToRemoteMetadataFsPath = 
generateMetadataFile(leaseId, lease);
+
+        // generate remote fsPath of metadata.
+        KvSnapshotLeaseMetadata leaseMetadata =
+                new KvSnapshotLeaseMetadata(
+                        lease.getExpirationTime(), 
tableIdToRemoteMetadataFsPath);
+
+        // register kv snapshot metadata to zk.
+        try {
+            zkClient.registerKvSnapshotLeaseMetadata(leaseId, leaseMetadata);
+        } catch (Exception e) {
+            LOG.warn("Failed to register kv snapshot lease metadata to zk.", 
e);
+            leaseMetadata.discard();
+            throw e;
+        }
+    }
+
+    /**
+     * Update a kv snapshot lease to zk and remote fs.
+     *
+     * @param leaseId the lease id.
+     * @param kvSnapshotLease the kv snapshot lease.
+     */
+    public void updateLease(String leaseId, KvSnapshotLease kvSnapshotLease) 
throws Exception {
+        // TODO change this to incremental update to avoid create too many 
remote metadata files.
+
+        Optional<KvSnapshotLeaseMetadata> originalLeaseMetadata =
+                zkClient.getKvSnapshotLeaseMetadata(leaseId);
+
+        Map<Long, FsPath> tableIdToNewRemoteMetadataFsPath =
+                generateMetadataFile(leaseId, kvSnapshotLease);
+
+        // generate  new kv snapshot lease metadata.
+        KvSnapshotLeaseMetadata newLeaseMetadata =
+                new KvSnapshotLeaseMetadata(
+                        kvSnapshotLease.getExpirationTime(), 
tableIdToNewRemoteMetadataFsPath);
+        // register new snapshot metadata to zk.
+        try {
+            zkClient.updateKvSnapshotLeaseMetadata(leaseId, newLeaseMetadata);
+        } catch (Exception e) {
+            LOG.warn("Failed to update kv snapshot lease metadata to zk.", e);
+            newLeaseMetadata.discard();

Review Comment:
   Perhaps we should have a mechanism to scan and clean up orphaned lease files 
during initialization.



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java:
##########
@@ -200,14 +208,21 @@ interface SubsumeAction {
         void subsume(CompletedSnapshot snapshot) throws Exception;
     }
 
-    private static boolean canSubsume(CompletedSnapshot next, 
CompletedSnapshot latest) {
+    /** A function to determine whether a snapshot can be subsumed. */
+    @FunctionalInterface
+    public interface CanSubsume {

Review Comment:
   “CanSubsume” as a class name isn’t ideal—it reads more like a predicate 
method (e.g., canSubsume()) than a noun that represents a type or entity. In 
object-oriented design, class names should typically be nouns that clearly 
convey what the class is or represents. (e.g., SubsumptionChecker).



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -531,22 +543,96 @@ private List<SourceSplitBase> 
initPrimaryKeyTablePartitionSplits(
         List<SourceSplitBase> splits = new ArrayList<>();
         for (Partition partition : newPartitions) {
             String partitionName = partition.getPartitionName();
-            // get the table snapshot info
-            final KvSnapshots kvSnapshots;
-            try {
-                kvSnapshots = flussAdmin.getLatestKvSnapshots(tablePath, 
partitionName).get();
-            } catch (Exception e) {
-                throw new FlinkRuntimeException(
-                        String.format(
-                                "Failed to get table snapshot for table %s and 
partition %s",
-                                tablePath, partitionName),
-                        ExceptionUtils.stripCompletionException(e));
-            }
-            splits.addAll(getSnapshotAndLogSplits(kvSnapshots, partitionName));
+            splits.addAll(
+                    getSnapshotAndLogSplits(
+                            getLatestKvSnapshotsAndRegister(partitionName), 
partitionName));
         }
         return splits;
     }
 
+    private KvSnapshots getLatestKvSnapshotsAndRegister(@Nullable String 
partitionName) {
+        long tableId;
+        Long partitionId;
+        Map<Integer, Long> snapshotIds = new HashMap<>();
+        Map<Integer, Long> logOffsets = new HashMap<>();
+
+        // retry to get the latest kv snapshots and acquire kvSnapshot lease 
util all buckets
+        // acquire success. The reason is that getLatestKvSnapshots and 
acquireKvSnapshotLease
+        // are not atomic operations, the latest kv snapshot obtained via get 
may become outdated by
+        // the time it is passed to acquire. Therefore, this logic must 
implement a retry
+        // mechanism: the unavailable tableBuckets in the 
AcquiredKvSnapshotLeaseResult returned by
+        // acquireKvSnapshotLease must be retried repeatedly until all buckets 
are successfully
+        // acquired.
+        try {
+            Set<TableBucket> remainingTableBuckets;
+            do {
+                KvSnapshots kvSnapshots = getLatestKvSnapshots(partitionName);
+                remainingTableBuckets = new 
HashSet<>(kvSnapshots.getTableBuckets());
+
+                tableId = kvSnapshots.getTableId();
+                partitionId = kvSnapshots.getPartitionId();
+
+                Set<TableBucket> ignoreBuckets = new HashSet<>();
+                Map<TableBucket, Long> bucketsToLease = new HashMap<>();
+                for (TableBucket tb : remainingTableBuckets) {
+                    int bucket = tb.getBucket();
+                    OptionalLong snapshotIdOpt = 
kvSnapshots.getSnapshotId(bucket);
+                    OptionalLong logOffsetOpt = 
kvSnapshots.getLogOffset(bucket);
+                    if (snapshotIdOpt.isPresent() && !ignoreTableBucket(tb)) {
+                        bucketsToLease.put(tb, snapshotIdOpt.getAsLong());
+                    } else {
+                        ignoreBuckets.add(tb);
+                    }
+
+                    snapshotIds.put(
+                            bucket, snapshotIdOpt.isPresent() ? 
snapshotIdOpt.getAsLong() : null);
+                    logOffsets.put(
+                            bucket, logOffsetOpt.isPresent() ? 
logOffsetOpt.getAsLong() : null);
+                }
+
+                if (!ignoreBuckets.isEmpty()) {
+                    remainingTableBuckets.removeAll(ignoreBuckets);
+                }
+
+                if (!bucketsToLease.isEmpty()) {
+                    String kvSnapshotLeaseId = 
leaseContext.getKvSnapshotLeaseId();
+                    LOG.info(
+                            "Try to acquire kv snapshot lease {} for table {}",
+                            kvSnapshotLeaseId,
+                            tablePath);
+                    remainingTableBuckets =
+                            flussAdmin
+                                    .acquireKvSnapshotLease(
+                                            kvSnapshotLeaseId,
+                                            bucketsToLease,
+                                            
leaseContext.getKvSnapshotLeaseDurationMs())
+                                    .get()
+                                    .getUnavailableTableBucketSet();
+                    if (!remainingTableBuckets.isEmpty()) {
+                        LOG.info(
+                                "Failed to acquire kv snapshot lease for table 
{}: {}. Retry to re-acquire",
+                                tablePath,
+                                remainingTableBuckets);
+                    }
+                }
+            } while (!remainingTableBuckets.isEmpty());

Review Comment:
   Do not use infinite retry loops, and add a delay between requests to avoid 
retry storms.
   
   



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java:
##########
@@ -51,6 +57,11 @@ public class FlinkSourceReader<OUT>
         extends SingleThreadMultiplexSourceReaderBaseAdapter<
                 RecordAndPos, OUT, SourceSplitBase, SourceSplitState> {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkSourceReader.class);
+
+    /** the tableBuckets ignore to send FinishedKvSnapshotConsumeEvent as it 
already sending. */
+    private final Set<TableBucket> ignoreBuckets;

Review Comment:
   The variable name does not clearly indicate its purpose or usage.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+    private static final AtomicInteger ZERO = new AtomicInteger(0);
+
+    private final KvSnapshotLeaseMetadataManager metadataHelper;
+    private final CoordinatorContext coordinatorContext;
+    private final Clock clock;
+    private final ScheduledExecutorService scheduledExecutor;
+    private final Configuration conf;
+
+    private final Map<String, ReadWriteLock> leaseLocks = 
MapUtils.newConcurrentHashMap();
+    /** lease id to kv snapshot lease. */
+    @GuardedBy("leaseLocks")
+    private final Map<String, KvSnapshotLease> kvSnapshotLeaseMap;
+
+    private final ReadWriteLock refCountLock = new ReentrantReadWriteLock();
+
+    /**
+     * KvSnapshotLeaseForBucket to the ref count, which means this table 
bucket + snapshotId has
+     * been leased by how many lease id.
+     */
+    private final Map<KvSnapshotLeaseForBucket, AtomicInteger> refCount =
+            MapUtils.newConcurrentHashMap();

Review Comment:
   The current state management design using Map<KvSnapshotLeaseForBucket, 
AtomicInteger> refCount is inefficient. I strongly recommend using 
Map<TableBucket, ConcurrentSet<String /* leaseId */>> instead, for two reasons:
   ① Every method that accesses this map in the code must instantiate a new 
KvSnapshotLeaseForBucket object, creating unnecessary GC pressure.
   ② Maintaining the correctness of refCount is extremely difficult, which 
severely degrades code readability and leads to code rot.



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lease/KvSnapshotTableLease.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lease;
+
+import org.apache.fluss.utils.MapUtils;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/** The lease of kv snapshot for a table. */
+@NotThreadSafe
+public class KvSnapshotTableLease {
+    private final long tableId;
+    private final @Nullable Long[] bucketSnapshots;
+    private final Map<Long, Long[]> partitionSnapshots;
+
+    public KvSnapshotTableLease(long tableId) {
+        this(tableId, null, MapUtils.newConcurrentHashMap());
+    }
+
+    public KvSnapshotTableLease(long tableId, Long[] bucketSnapshots) {
+        this(tableId, bucketSnapshots, Collections.emptyMap());
+    }
+
+    public KvSnapshotTableLease(long tableId, Map<Long, Long[]> 
partitionSnapshots) {
+        this(tableId, null, partitionSnapshots);
+    }
+
+    public KvSnapshotTableLease(
+            long tableId, @Nullable Long[] bucketSnapshots, Map<Long, Long[]> 
partitionSnapshots) {
+        this.tableId = tableId;
+        this.bucketSnapshots = bucketSnapshots;
+        this.partitionSnapshots = partitionSnapshots;
+    }
+
+    public long getTableId() {
+        return tableId;
+    }
+
+    public @Nullable Long[] getBucketSnapshots() {
+        return bucketSnapshots;
+    }
+
+    public @Nullable Long[] getBucketSnapshots(long partitionId) {
+        return partitionSnapshots.get(partitionId);
+    }
+
+    public Map<Long, Long[]> getPartitionSnapshots() {
+        return partitionSnapshots;
+    }
+
+    public void addPartitionSnapshots(long partitionId, Long[] snapshots) {
+        if (bucketSnapshots != null) {
+            throw new IllegalStateException("This is an none partition table 
lease.");
+        }
+        partitionSnapshots.put(partitionId, snapshots);
+    }
+
+    public int getLeasedSnapshotCount() {
+        int count = 0;
+        if (bucketSnapshots != null) {
+            for (Long snapshot : bucketSnapshots) {
+                if (snapshot != -1L) {
+                    count++;
+                }
+            }
+        } else {
+            for (Long[] snapshots : partitionSnapshots.values()) {
+                for (Long snapshot : snapshots) {
+                    if (snapshot != -1L) {
+                        count++;
+                    }
+                }
+            }
+        }
+        return count;
+    }
+
+    @Override
+    public String toString() {
+        String partitionSnapshotsStr = formatLongArrayMap(partitionSnapshots);
+        return "KvSnapshotTableLease{"
+                + "tableId="
+                + tableId
+                + ", bucketSnapshots="
+                + Arrays.toString(bucketSnapshots)
+                + ", partitionSnapshots="
+                + partitionSnapshotsStr
+                + '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        KvSnapshotTableLease that = (KvSnapshotTableLease) o;
+        return tableId == that.tableId
+                && Arrays.equals(bucketSnapshots, that.bucketSnapshots)
+                && deepEqualsMapOfArrays(partitionSnapshots, 
that.partitionSnapshots);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(tableId);
+        result = 31 * result + Arrays.hashCode(bucketSnapshots);
+        result = 31 * result + deepHashCodeMapOfArrays(partitionSnapshots);
+        return result;
+    }
+
+    private static String formatLongArrayMap(Map<Long, Long[]> map) {
+        if (map == null) {
+            return "null";
+        }
+        StringBuilder sb = new StringBuilder("{");
+        boolean first = true;
+        for (Map.Entry<Long, Long[]> entry : map.entrySet()) {
+            if (!first) {
+                sb.append(", ");
+            }
+            
sb.append(entry.getKey()).append("=").append(Arrays.toString(entry.getValue()));
+            first = false;
+        }
+        sb.append("}");
+        return sb.toString();
+    }
+
+    private static boolean deepEqualsMapOfArrays(Map<Long, Long[]> map1, 
Map<Long, Long[]> map2) {
+        if (map1 == map2) {
+            return true;
+        }
+        if (map1 == null || map2 == null || map1.size() != map2.size()) {
+            return false;
+        }
+
+        for (Map.Entry<Long, Long[]> entry : map1.entrySet()) {
+            Long key = entry.getKey();
+            Long[] value1 = entry.getValue();
+            Long[] value2 = map2.get(key);
+
+            if (value2 == null) {
+                return false;
+            }
+
+            if (!Arrays.equals(value1, value2)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static int deepHashCodeMapOfArrays(Map<Long, Long[]> map) {
+        if (map == null) {
+            return 0;
+        }
+        int hash = 0;
+        for (Map.Entry<Long, Long[]> entry : map.entrySet()) {
+            Long key = entry.getKey();
+            Long[] value = entry.getValue();
+            // Combine key hash and array content hash
+            hash += Objects.hashCode(key) ^ Arrays.hashCode(value);

Review Comment:
   hash = 31 * hash + (Objects.hashCode(key) ^ Arrays.hashCode(value));



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+    private static final AtomicInteger ZERO = new AtomicInteger(0);
+
+    private final KvSnapshotLeaseMetadataManager metadataHelper;
+    private final CoordinatorContext coordinatorContext;
+    private final Clock clock;
+    private final ScheduledExecutorService scheduledExecutor;
+    private final Configuration conf;
+
+    private final Map<String, ReadWriteLock> leaseLocks = 
MapUtils.newConcurrentHashMap();
+    /** lease id to kv snapshot lease. */
+    @GuardedBy("leaseLocks")
+    private final Map<String, KvSnapshotLease> kvSnapshotLeaseMap;
+
+    private final ReadWriteLock refCountLock = new ReentrantReadWriteLock();
+
+    /**
+     * KvSnapshotLeaseForBucket to the ref count, which means this table 
bucket + snapshotId has
+     * been leased by how many lease id.
+     */
+    private final Map<KvSnapshotLeaseForBucket, AtomicInteger> refCount =
+            MapUtils.newConcurrentHashMap();
+
+    /** For metrics. */
+    private final AtomicInteger leasedBucketCount = new AtomicInteger(0);
+
+    public KvSnapshotLeaseManager(
+            Configuration conf,
+            KvSnapshotLeaseMetadataManager metadataHelper,
+            CoordinatorContext coordinatorContext,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this(
+                conf,
+                metadataHelper,
+                coordinatorContext,
+                Executors.newScheduledThreadPool(
+                        1, new 
ExecutorThreadFactory("kv-snapshot-lease-cleaner")),
+                clock,
+                coordinatorMetricGroup);
+    }
+
+    @VisibleForTesting
+    public KvSnapshotLeaseManager(
+            Configuration conf,
+            KvSnapshotLeaseMetadataManager metadataHelper,
+            CoordinatorContext coordinatorContext,
+            ScheduledExecutorService scheduledExecutor,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this.metadataHelper = metadataHelper;
+        this.conf = conf;
+        this.scheduledExecutor = scheduledExecutor;
+        this.coordinatorContext = coordinatorContext;
+        this.clock = clock;
+        this.kvSnapshotLeaseMap = MapUtils.newConcurrentHashMap();
+
+        registerMetrics(coordinatorMetricGroup);
+    }
+
+    public void start() {
+        LOG.info("kv snapshot lease manager has been started.");
+        scheduledExecutor.scheduleWithFixedDelay(
+                this::expireLeases,
+                0L,
+                
conf.get(ConfigOptions.KV_SNAPSHOT_LEASE_EXPIRATION_CHECK_INTERVAL).toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    public void initialize() throws Exception {
+        for (String leaseId : metadataHelper.getLeasesList()) {
+            Optional<KvSnapshotLease> kvSnapshotLeaseOpt = 
metadataHelper.getLease(leaseId);

Review Comment:
   What I mean is, after a standby node is initialized, can it stay consistent 
with the primary node? And if a failover occurs, can it correctly handle the 
business logic?
   
   



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+    private static final AtomicInteger ZERO = new AtomicInteger(0);
+
+    private final KvSnapshotLeaseMetadataManager metadataHelper;
+    private final CoordinatorContext coordinatorContext;
+    private final Clock clock;
+    private final ScheduledExecutorService scheduledExecutor;
+    private final Configuration conf;
+
+    private final Map<String, ReadWriteLock> leaseLocks = 
MapUtils.newConcurrentHashMap();
+    /** lease id to kv snapshot lease. */
+    @GuardedBy("leaseLocks")
+    private final Map<String, KvSnapshotLease> kvSnapshotLeaseMap;
+
+    private final ReadWriteLock refCountLock = new ReentrantReadWriteLock();
+
+    /**
+     * KvSnapshotLeaseForBucket to the ref count, which means this table 
bucket + snapshotId has
+     * been leased by how many lease id.
+     */
+    private final Map<KvSnapshotLeaseForBucket, AtomicInteger> refCount =
+            MapUtils.newConcurrentHashMap();
+
+    /** For metrics. */
+    private final AtomicInteger leasedBucketCount = new AtomicInteger(0);
+
+    public KvSnapshotLeaseManager(
+            Configuration conf,
+            KvSnapshotLeaseMetadataManager metadataHelper,
+            CoordinatorContext coordinatorContext,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this(
+                conf,
+                metadataHelper,
+                coordinatorContext,
+                Executors.newScheduledThreadPool(
+                        1, new 
ExecutorThreadFactory("kv-snapshot-lease-cleaner")),
+                clock,
+                coordinatorMetricGroup);
+    }
+
+    @VisibleForTesting
+    public KvSnapshotLeaseManager(
+            Configuration conf,
+            KvSnapshotLeaseMetadataManager metadataHelper,
+            CoordinatorContext coordinatorContext,
+            ScheduledExecutorService scheduledExecutor,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this.metadataHelper = metadataHelper;
+        this.conf = conf;
+        this.scheduledExecutor = scheduledExecutor;
+        this.coordinatorContext = coordinatorContext;
+        this.clock = clock;
+        this.kvSnapshotLeaseMap = MapUtils.newConcurrentHashMap();
+
+        registerMetrics(coordinatorMetricGroup);
+    }
+
+    public void start() {
+        LOG.info("kv snapshot lease manager has been started.");
+        scheduledExecutor.scheduleWithFixedDelay(
+                this::expireLeases,
+                0L,
+                
conf.get(ConfigOptions.KV_SNAPSHOT_LEASE_EXPIRATION_CHECK_INTERVAL).toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    public void initialize() throws Exception {
+        for (String leaseId : metadataHelper.getLeasesList()) {
+            Optional<KvSnapshotLease> kvSnapshotLeaseOpt = 
metadataHelper.getLease(leaseId);
+            if (kvSnapshotLeaseOpt.isPresent()) {
+                KvSnapshotLease kvSnapshotLease = kvSnapshotLeaseOpt.get();
+                this.leaseLocks.put(leaseId, new ReentrantReadWriteLock());
+                this.kvSnapshotLeaseMap.put(leaseId, kvSnapshotLease);
+
+                initializeRefCount(kvSnapshotLease);
+
+                
leasedBucketCount.addAndGet(kvSnapshotLease.getLeasedSnapshotCount());
+            }
+        }
+    }
+
+    public boolean snapshotLeaseNotExist(KvSnapshotLeaseForBucket 
kvSnapshotLeaseForBucket) {
+        return inReadLock(
+                refCountLock,
+                () ->
+                        refCount.getOrDefault(kvSnapshotLeaseForBucket, new 
AtomicInteger(0)).get()
+                                <= 0);
+    }
+
+    /**
+     * Acquire kv snapshot lease.
+     *
+     * @param leaseId the lease id
+     * @param leaseDuration the lease duration
+     * @param tableIdToLeaseBucket the table id to lease bucket
+     * @return the map of unavailable snapshots that failed to be leased
+     */
+    public Map<TableBucket, Long> acquireLease(
+            String leaseId,
+            long leaseDuration,
+            Map<Long, List<KvSnapshotLeaseForBucket>> tableIdToLeaseBucket)
+            throws Exception {
+        ReadWriteLock lock = leaseLocks.computeIfAbsent(leaseId, k -> new 
ReentrantReadWriteLock());
+        return inWriteLock(
+                lock,
+                () -> {
+                    // To record the unavailable snapshots such as the kv 
snapshotId to lease not
+                    // exists.
+                    Map<TableBucket, Long> unavailableSnapshots = new 
HashMap<>();
+
+                    boolean update = kvSnapshotLeaseMap.containsKey(leaseId);
+                    KvSnapshotLease kvSnapshotLease;
+                    long newLeaseDuration = clock.milliseconds() + 
leaseDuration;
+                    if (!update) {
+                        // set the expiration time as: current time + 
leaseDuration
+                        kvSnapshotLease = new 
KvSnapshotLease(newLeaseDuration);
+                        kvSnapshotLeaseMap.put(leaseId, kvSnapshotLease);
+                        LOG.info(

Review Comment:
   Don't use string concatenation to print logs.
   
   



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+    private static final AtomicInteger ZERO = new AtomicInteger(0);
+
+    private final KvSnapshotLeaseMetadataManager metadataHelper;
+    private final CoordinatorContext coordinatorContext;

Review Comment:
   Although in practice we only access coordinatorContext within acquireLease, 
and acquireLease is exclusively invoked by coordinatorProcessor, meaning there 
is actually no thread-safety issue, this thread-safety guarantee is overly 
implicit and obscure—indicating a bad code smell. Other maintainers of the 
codebase could easily misunderstand or inadvertently modify the code, thereby 
breaking the implicit safety assumptions. While refactoring and fixing this 
properly might be costly, we can at least add comments to explicitly document 
the thread-safety constraints here.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to