luoyuxia commented on code in PR #3286:
URL: https://github.com/apache/fluss/pull/3286#discussion_r3212825081


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -245,11 +307,23 @@ public void addReader(int subtaskId) {
                     int globalMaxAttempt = max(maxAttempts);
                     if (maxAttempts.size() == 1 && globalMaxAttempt >= 1) {
                         LOG.info(
-                                "Failover completed. All {} subtasks reached 
the same attempt number {}. Current registered readers are {}",
+                                "All {} subtasks reached the same attempt 
number {}. Current registered readers are {}. Waiting for failed-table report 
to complete before clearing failover state.",

Review Comment:
   why change this failover logic?



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -313,10 +391,18 @@ private void handleSourceReaderFailOver() {
                 tieringTableEpochs);
         // we need to make all as failed
         failedTableEpochs.putAll(new HashMap<>(tieringTableEpochs));
+        // release all currently leased buckets for tables that are being 
marked failed;
+        // take a snapshot of the keys first to avoid concurrent modification.
+        Set<Long> tableIdsToRelease = new 
HashSet<>(tieringTableEpochs.keySet());
         tieringTableEpochs.clear();
         tieringReachMaxDurationsTables.clear();
         // also clean all pending splits since we mark all as failed
         pendingSplits.clear();
+        // Release leases asynchronously to avoid blocking the coordinator 
thread when
+        // multiple tables are involved and RPC calls may time out.
+        for (Long tableId : tableIdsToRelease) {

Review Comment:
   can we just release all in `leasedBucketsByTable`? So that we won't need to 
collect `tableIdsToRelease`. 
   



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -96,6 +102,19 @@ public class TieringSourceEnumerator
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TieringSourceEnumerator.class);
 
+    /**
+     * KV snapshot lease duration for the whole tiering job. One lease covers 
the entire job
+     * lifecycle; it is renewed implicitly by every {@code acquireSnapshots} 
call, so a relatively
+     * long duration is safe and also bounds the worst-case leaked-lease 
lifetime if the job dies
+     * abnormally.
+     *
+     * <p>TODO: introduce an explicit periodic lease-renewal mechanism so that 
a single tiering
+     * round that exceeds {@link #KV_SNAPSHOT_LEASE_DURATION_MS} (e.g. for 
very large tables) will

Review Comment:
   nit: Javadoc pointing to itself 



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java:
##########
@@ -696,13 +705,25 @@ private void waitUntilTieringTableSplitAssignmentReady(
             int expectedSplitsNum,
             long sleepMs)
             throws Throwable {
+        long startTime = System.currentTimeMillis();
+        long timeoutMs = 10000; // 10秒超时
+
         while (context.getSplitsAssignmentSequence().size() < 
expectedSplitsNum) {
+            if (System.currentTimeMillis() - startTime > timeoutMs) {
+                throw new AssertionError(
+                        String.format(
+                                "等待分配超时: 期望 %d 个分配, 实际 %d 个分配, 超时时间 %dms",

Review Comment:
   why change this?
   Also, please use engligh



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -184,6 +233,19 @@ public void handleSplitRequest(int subtaskId, @Nullable 
String requesterHostname
         LOG.info("TieringSourceReader {} requests split.", subtaskId);
         readersAwaitingSplit.add(subtaskId);
 
+        // During failover we must not request a new tiering table or assign 
any splits.
+        // Otherwise we might re-acquire leases for a table whose leases were 
just released by
+        // handleSourceReaderFailOver, or assign splits to subtasks whose 
readers from the new
+        // attempt have not been fully registered yet. The pending 
failed-table report and the
+        // subsequent split request will be driven by the periodic callAsync 
once failover is
+        // marked complete.
+        if (isFailOvering) {

Review Comment:
   why change this failover logic?



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -533,6 +641,208 @@ public void close() throws IOException {
         }
     }
 
+    /**
+     * Acquire kv snapshot lease for all {@link TieringSnapshotSplit}s of the 
given table so that
+     * snapshots referenced by these splits will not be cleaned up by the 
Fluss server during
+     * tiering. Bucket-snapshot mappings are remembered in {@link 
#leasedBucketsByTable} so they can
+     * be released on finish/fail.
+     *
+     * <p>Falls back to a warning (no exception thrown) when the server does 
not support the kv
+     * snapshot lease API, to preserve compatibility with older Fluss clusters.
+     */
+    private void maybeAcquireKvSnapshotLease(long tableId, List<TieringSplit> 
tieringSplits) {
+        if (flussAdmin == null) {
+            return;
+        }
+        Map<TableBucket, Long> bucketsToLease = new HashMap<>();
+        for (TieringSplit split : tieringSplits) {
+            if (split.isTieringSnapshotSplit()) {
+                TieringSnapshotSplit snapshotSplit = 
split.asTieringSnapshotSplit();
+                bucketsToLease.put(snapshotSplit.getTableBucket(), 
snapshotSplit.getSnapshotId());
+            }
+        }
+        if (bucketsToLease.isEmpty()) {
+            return;
+        }
+        LOG.info(
+                "Try to acquire kv snapshot lease {} for tiering table {} with 
{} buckets.",
+                kvSnapshotLeaseId,
+                tableId,
+                bucketsToLease.size());
+        try {
+            Set<TableBucket> unavailableBuckets =
+                    flussAdmin
+                            .createKvSnapshotLease(kvSnapshotLeaseId, 
KV_SNAPSHOT_LEASE_DURATION_MS)
+                            .acquireSnapshots(bucketsToLease)
+                            .get()
+                            .getUnavailableTableBucketSet();
+            // Only record successfully leased buckets so we don't later try 
to release
+            // buckets that were never actually acquired (e.g. snapshots 
already GC'ed or
+            // missing on the server).
+            Set<TableBucket> acquiredBuckets = new 
HashSet<>(bucketsToLease.keySet());
+            if (!unavailableBuckets.isEmpty()) {
+                LOG.warn(
+                        "Failed to acquire kv snapshot lease for {} of {} 
buckets of tiering "
+                                + "table {}: {}. The corresponding snapshots 
may have already "
+                                + "been garbage-collected; tiering for those 
buckets may fail "
+                                + "later when the snapshots are accessed.",
+                        unavailableBuckets.size(),
+                        bucketsToLease.size(),
+                        tableId,
+                        unavailableBuckets);
+                acquiredBuckets.removeAll(unavailableBuckets);
+            }
+            if (!acquiredBuckets.isEmpty()) {
+                leasedBucketsByTable
+                        .computeIfAbsent(tableId, k -> 
ConcurrentHashMap.newKeySet())
+                        .addAll(acquiredBuckets);
+            }
+        } catch (Exception e) {
+            if (ExceptionUtils.findThrowable(e, 
UnsupportedVersionException.class).isPresent()) {
+                LOG.warn(
+                        "Failed to acquire kv snapshot lease for tiering table 
{} because the "
+                                + "server does not support kv snapshot lease 
API. Snapshots may "
+                                + "be cleaned up earlier than expected. Please 
upgrade the Fluss "
+                                + "server to version 0.9 or later.",
+                        tableId,
+                        e);
+            } else {
+                LOG.error(
+                        "Failed to acquire kv snapshot lease for tiering table 
{}. "
+                                + "Tiering will proceed without snapshot 
protection; the "
+                                + "snapshot may be garbage-collected while 
tiering is in progress.",
+                        tableId,
+                        e);
+            }
+        }
+    }
+
+    /**
+     * Release the kv snapshot lease held for a specific table. Called when a 
table finishes
+     * tiering, fails, or is abandoned due to failover. Missing leases 
(log-only tables, or tables
+     * for which acquire failed) are handled as no-ops.
+     */
+    private void maybeReleaseKvSnapshotLease(long tableId) {
+        // Peek the buckets without removing so that, if the release RPC 
fails, we can keep
+        // the entry in leasedBucketsByTable for a later retry instead of 
permanently
+        // forgetting which buckets are still leased on the server side.
+        Set<TableBucket> buckets = leasedBucketsByTable.get(tableId);
+        if (flussAdmin == null || buckets == null || buckets.isEmpty()) {
+            // Nothing to release; clean up any empty entry.
+            leasedBucketsByTable.remove(tableId);
+            return;
+        }
+        LOG.info(
+                "Try to release kv snapshot lease {} for tiering table {} with 
{} buckets.",
+                kvSnapshotLeaseId,
+                tableId,
+                buckets.size());
+        // Take a defensive copy of the buckets to release so concurrent 
updates to the
+        // tracked set do not affect the in-flight RPC payload.
+        Set<TableBucket> bucketsToRelease = new HashSet<>(buckets);
+        try {
+            flussAdmin
+                    .createKvSnapshotLease(kvSnapshotLeaseId, 
KV_SNAPSHOT_LEASE_DURATION_MS)
+                    .releaseSnapshots(bucketsToRelease)
+                    .get();
+            // Only drop the bookkeeping entry after the server confirms the 
release.
+            Set<TableBucket> tracked = leasedBucketsByTable.get(tableId);
+            if (tracked != null) {
+                tracked.removeAll(bucketsToRelease);
+                if (tracked.isEmpty()) {
+                    leasedBucketsByTable.remove(tableId);
+                }
+            }
+        } catch (Exception e) {
+            if (ExceptionUtils.findThrowable(e, 
UnsupportedVersionException.class).isPresent()) {
+                // Server does not support the lease API; drop tracking since 
release is a
+                // no-op and there is no point retrying.
+                leasedBucketsByTable.remove(tableId);
+                LOG.warn(
+                        "Failed to release kv snapshot lease for tiering table 
{} because the "
+                                + "server does not support kv snapshot lease 
API.",
+                        tableId,
+                        e);
+            } else {
+                // Keep the buckets tracked so we (or the next failover/close) 
can retry.
+                LOG.error(
+                        "Failed to release kv snapshot lease for tiering table 
{}; the buckets "
+                                + "remain tracked and will be retried on next 
release attempt.",
+                        tableId,
+                        e);
+            }
+        }
+    }
+
+    /**
+     * Asynchronous variant of {@link #maybeReleaseKvSnapshotLease(long)} used 
during failover to
+     * avoid blocking the coordinator thread when multiple tables need to be 
released.
+     */
+    private void maybeReleaseKvSnapshotLeaseAsync(long tableId) {

Review Comment:
   maybeReleaseKvSnapshotLeaseAsync is most same to 
maybeReleaseKvSnapshotLease, can we just keep one single method? 



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -533,6 +641,208 @@ public void close() throws IOException {
         }
     }
 
+    /**
+     * Acquire kv snapshot lease for all {@link TieringSnapshotSplit}s of the 
given table so that
+     * snapshots referenced by these splits will not be cleaned up by the 
Fluss server during
+     * tiering. Bucket-snapshot mappings are remembered in {@link 
#leasedBucketsByTable} so they can
+     * be released on finish/fail.
+     *
+     * <p>Falls back to a warning (no exception thrown) when the server does 
not support the kv
+     * snapshot lease API, to preserve compatibility with older Fluss clusters.
+     */
+    private void maybeAcquireKvSnapshotLease(long tableId, List<TieringSplit> 
tieringSplits) {
+        if (flussAdmin == null) {
+            return;
+        }
+        Map<TableBucket, Long> bucketsToLease = new HashMap<>();
+        for (TieringSplit split : tieringSplits) {
+            if (split.isTieringSnapshotSplit()) {
+                TieringSnapshotSplit snapshotSplit = 
split.asTieringSnapshotSplit();
+                bucketsToLease.put(snapshotSplit.getTableBucket(), 
snapshotSplit.getSnapshotId());
+            }
+        }
+        if (bucketsToLease.isEmpty()) {
+            return;
+        }
+        LOG.info(
+                "Try to acquire kv snapshot lease {} for tiering table {} with 
{} buckets.",
+                kvSnapshotLeaseId,
+                tableId,
+                bucketsToLease.size());
+        try {
+            Set<TableBucket> unavailableBuckets =
+                    flussAdmin
+                            .createKvSnapshotLease(kvSnapshotLeaseId, 
KV_SNAPSHOT_LEASE_DURATION_MS)
+                            .acquireSnapshots(bucketsToLease)
+                            .get()
+                            .getUnavailableTableBucketSet();
+            // Only record successfully leased buckets so we don't later try 
to release
+            // buckets that were never actually acquired (e.g. snapshots 
already GC'ed or
+            // missing on the server).
+            Set<TableBucket> acquiredBuckets = new 
HashSet<>(bucketsToLease.keySet());
+            if (!unavailableBuckets.isEmpty()) {
+                LOG.warn(
+                        "Failed to acquire kv snapshot lease for {} of {} 
buckets of tiering "
+                                + "table {}: {}. The corresponding snapshots 
may have already "
+                                + "been garbage-collected; tiering for those 
buckets may fail "
+                                + "later when the snapshots are accessed.",
+                        unavailableBuckets.size(),
+                        bucketsToLease.size(),
+                        tableId,
+                        unavailableBuckets);
+                acquiredBuckets.removeAll(unavailableBuckets);
+            }
+            if (!acquiredBuckets.isEmpty()) {
+                leasedBucketsByTable
+                        .computeIfAbsent(tableId, k -> 
ConcurrentHashMap.newKeySet())
+                        .addAll(acquiredBuckets);
+            }
+        } catch (Exception e) {
+            if (ExceptionUtils.findThrowable(e, 
UnsupportedVersionException.class).isPresent()) {
+                LOG.warn(
+                        "Failed to acquire kv snapshot lease for tiering table 
{} because the "
+                                + "server does not support kv snapshot lease 
API. Snapshots may "
+                                + "be cleaned up earlier than expected. Please 
upgrade the Fluss "
+                                + "server to version 0.9 or later.",
+                        tableId,
+                        e);
+            } else {
+                LOG.error(
+                        "Failed to acquire kv snapshot lease for tiering table 
{}. "
+                                + "Tiering will proceed without snapshot 
protection; the "
+                                + "snapshot may be garbage-collected while 
tiering is in progress.",
+                        tableId,
+                        e);
+            }
+        }
+    }
+
+    /**
+     * Release the kv snapshot lease held for a specific table. Called when a 
table finishes
+     * tiering, fails, or is abandoned due to failover. Missing leases 
(log-only tables, or tables
+     * for which acquire failed) are handled as no-ops.
+     */
+    private void maybeReleaseKvSnapshotLease(long tableId) {
+        // Peek the buckets without removing so that, if the release RPC 
fails, we can keep
+        // the entry in leasedBucketsByTable for a later retry instead of 
permanently
+        // forgetting which buckets are still leased on the server side.
+        Set<TableBucket> buckets = leasedBucketsByTable.get(tableId);
+        if (flussAdmin == null || buckets == null || buckets.isEmpty()) {
+            // Nothing to release; clean up any empty entry.
+            leasedBucketsByTable.remove(tableId);
+            return;
+        }
+        LOG.info(
+                "Try to release kv snapshot lease {} for tiering table {} with 
{} buckets.",
+                kvSnapshotLeaseId,
+                tableId,
+                buckets.size());
+        // Take a defensive copy of the buckets to release so concurrent 
updates to the
+        // tracked set do not affect the in-flight RPC payload.
+        Set<TableBucket> bucketsToRelease = new HashSet<>(buckets);
+        try {
+            flussAdmin
+                    .createKvSnapshotLease(kvSnapshotLeaseId, 
KV_SNAPSHOT_LEASE_DURATION_MS)
+                    .releaseSnapshots(bucketsToRelease)
+                    .get();
+            // Only drop the bookkeeping entry after the server confirms the 
release.
+            Set<TableBucket> tracked = leasedBucketsByTable.get(tableId);
+            if (tracked != null) {
+                tracked.removeAll(bucketsToRelease);
+                if (tracked.isEmpty()) {
+                    leasedBucketsByTable.remove(tableId);
+                }
+            }
+        } catch (Exception e) {
+            if (ExceptionUtils.findThrowable(e, 
UnsupportedVersionException.class).isPresent()) {
+                // Server does not support the lease API; drop tracking since 
release is a
+                // no-op and there is no point retrying.
+                leasedBucketsByTable.remove(tableId);
+                LOG.warn(
+                        "Failed to release kv snapshot lease for tiering table 
{} because the "
+                                + "server does not support kv snapshot lease 
API.",
+                        tableId,
+                        e);
+            } else {
+                // Keep the buckets tracked so we (or the next failover/close) 
can retry.

Review Comment:
   when will we do the retry?
   
   



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -96,6 +102,19 @@ public class TieringSourceEnumerator
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TieringSourceEnumerator.class);
 
+    /**
+     * KV snapshot lease duration for the whole tiering job. One lease covers 
the entire job
+     * lifecycle; it is renewed implicitly by every {@code acquireSnapshots} 
call, so a relatively
+     * long duration is safe and also bounds the worst-case leaked-lease 
lifetime if the job dies
+     * abnormally.
+     *
+     * <p>TODO: introduce an explicit periodic lease-renewal mechanism so that 
a single tiering
+     * round that exceeds {@link #KV_SNAPSHOT_LEASE_DURATION_MS} (e.g. for 
very large tables) will
+     * not see its snapshots garbage-collected mid-flight. Tracked as a 
follow-up issue; tiering
+     * rounds are typically minute-level today so a 1-day lease is sufficient 
in practice.
+     */
+    private static final long KV_SNAPSHOT_LEASE_DURATION_MS = 
Duration.ofDays(1).toMillis();

Review Comment:
   change to `Duration.ofHours(6)`? I think 6 hource should be fine for most of 
case. 



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorState.java:
##########
@@ -19,23 +19,55 @@
 
 import 
org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator;
 
-/** The marker class of stateless component {@link TieringSourceEnumerator}. */
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/**
+ * The state of the {@link TieringSourceEnumerator}. Stores the KV snapshot 
lease id so that it can
+ * be recovered after a checkpoint restore, avoiding orphaned leases on the 
server side.
+ */
 public class TieringSourceEnumeratorState {

Review Comment:
   It will breaks the statless design of tiering service. I think we can just 
ignore the kvSnapshotLeaseId and delegate fluss cluster to do snapshot ttl. 



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -533,6 +641,208 @@ public void close() throws IOException {
         }
     }
 
+    /**
+     * Acquire kv snapshot lease for all {@link TieringSnapshotSplit}s of the 
given table so that
+     * snapshots referenced by these splits will not be cleaned up by the 
Fluss server during
+     * tiering. Bucket-snapshot mappings are remembered in {@link 
#leasedBucketsByTable} so they can
+     * be released on finish/fail.
+     *
+     * <p>Falls back to a warning (no exception thrown) when the server does 
not support the kv
+     * snapshot lease API, to preserve compatibility with older Fluss clusters.
+     */
+    private void maybeAcquireKvSnapshotLease(long tableId, List<TieringSplit> 
tieringSplits) {
+        if (flussAdmin == null) {

Review Comment:
   when flussAdmin will be null?



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -533,6 +641,208 @@ public void close() throws IOException {
         }
     }
 
+    /**
+     * Acquire kv snapshot lease for all {@link TieringSnapshotSplit}s of the 
given table so that
+     * snapshots referenced by these splits will not be cleaned up by the 
Fluss server during
+     * tiering. Bucket-snapshot mappings are remembered in {@link 
#leasedBucketsByTable} so they can
+     * be released on finish/fail.
+     *
+     * <p>Falls back to a warning (no exception thrown) when the server does 
not support the kv
+     * snapshot lease API, to preserve compatibility with older Fluss clusters.
+     */
+    private void maybeAcquireKvSnapshotLease(long tableId, List<TieringSplit> 
tieringSplits) {
+        if (flussAdmin == null) {
+            return;
+        }
+        Map<TableBucket, Long> bucketsToLease = new HashMap<>();
+        for (TieringSplit split : tieringSplits) {
+            if (split.isTieringSnapshotSplit()) {
+                TieringSnapshotSplit snapshotSplit = 
split.asTieringSnapshotSplit();
+                bucketsToLease.put(snapshotSplit.getTableBucket(), 
snapshotSplit.getSnapshotId());
+            }
+        }
+        if (bucketsToLease.isEmpty()) {
+            return;
+        }
+        LOG.info(
+                "Try to acquire kv snapshot lease {} for tiering table {} with 
{} buckets.",
+                kvSnapshotLeaseId,
+                tableId,
+                bucketsToLease.size());
+        try {
+            Set<TableBucket> unavailableBuckets =
+                    flussAdmin
+                            .createKvSnapshotLease(kvSnapshotLeaseId, 
KV_SNAPSHOT_LEASE_DURATION_MS)
+                            .acquireSnapshots(bucketsToLease)
+                            .get()
+                            .getUnavailableTableBucketSet();
+            // Only record successfully leased buckets so we don't later try 
to release
+            // buckets that were never actually acquired (e.g. snapshots 
already GC'ed or
+            // missing on the server).
+            Set<TableBucket> acquiredBuckets = new 
HashSet<>(bucketsToLease.keySet());
+            if (!unavailableBuckets.isEmpty()) {
+                LOG.warn(
+                        "Failed to acquire kv snapshot lease for {} of {} 
buckets of tiering "
+                                + "table {}: {}. The corresponding snapshots 
may have already "
+                                + "been garbage-collected; tiering for those 
buckets may fail "
+                                + "later when the snapshots are accessed.",
+                        unavailableBuckets.size(),
+                        bucketsToLease.size(),
+                        tableId,
+                        unavailableBuckets);
+                acquiredBuckets.removeAll(unavailableBuckets);
+            }
+            if (!acquiredBuckets.isEmpty()) {
+                leasedBucketsByTable
+                        .computeIfAbsent(tableId, k -> 
ConcurrentHashMap.newKeySet())
+                        .addAll(acquiredBuckets);
+            }
+        } catch (Exception e) {
+            if (ExceptionUtils.findThrowable(e, 
UnsupportedVersionException.class).isPresent()) {
+                LOG.warn(
+                        "Failed to acquire kv snapshot lease for tiering table 
{} because the "
+                                + "server does not support kv snapshot lease 
API. Snapshots may "
+                                + "be cleaned up earlier than expected. Please 
upgrade the Fluss "
+                                + "server to version 0.9 or later.",
+                        tableId,
+                        e);
+            } else {
+                LOG.error(
+                        "Failed to acquire kv snapshot lease for tiering table 
{}. "
+                                + "Tiering will proceed without snapshot 
protection; the "
+                                + "snapshot may be garbage-collected while 
tiering is in progress.",
+                        tableId,
+                        e);
+            }
+        }
+    }
+
+    /**
+     * Release the kv snapshot lease held for a specific table. Called when a 
table finishes
+     * tiering, fails, or is abandoned due to failover. Missing leases 
(log-only tables, or tables
+     * for which acquire failed) are handled as no-ops.
+     */
+    private void maybeReleaseKvSnapshotLease(long tableId) {
+        // Peek the buckets without removing so that, if the release RPC 
fails, we can keep
+        // the entry in leasedBucketsByTable for a later retry instead of 
permanently
+        // forgetting which buckets are still leased on the server side.
+        Set<TableBucket> buckets = leasedBucketsByTable.get(tableId);
+        if (flussAdmin == null || buckets == null || buckets.isEmpty()) {
+            // Nothing to release; clean up any empty entry.
+            leasedBucketsByTable.remove(tableId);
+            return;
+        }
+        LOG.info(
+                "Try to release kv snapshot lease {} for tiering table {} with 
{} buckets.",
+                kvSnapshotLeaseId,
+                tableId,
+                buckets.size());
+        // Take a defensive copy of the buckets to release so concurrent 
updates to the
+        // tracked set do not affect the in-flight RPC payload.
+        Set<TableBucket> bucketsToRelease = new HashSet<>(buckets);
+        try {
+            flussAdmin
+                    .createKvSnapshotLease(kvSnapshotLeaseId, 
KV_SNAPSHOT_LEASE_DURATION_MS)
+                    .releaseSnapshots(bucketsToRelease)
+                    .get();
+            // Only drop the bookkeeping entry after the server confirms the 
release.
+            Set<TableBucket> tracked = leasedBucketsByTable.get(tableId);

Review Comment:
   can we just call `leasedBucketsByTable.remove(tableId);`?
   IIUC, `bucketsToRelease` is same with the buckets in the 
`leasedBucketsByTable`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to