Copilot commented on code in PR #3286:
URL: https://github.com/apache/fluss/pull/3286#discussion_r3212376980


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -515,6 +586,18 @@ public void close() throws IOException {
                 LOG.error("Failed to close Tiering Source enumerator.", e);
             }
         }
+        // NOTE: we intentionally do NOT drop the kv snapshot lease here. The 
lease id is
+        // persisted into the enumerator checkpoint state and will be reused 
by the restored
+        // enumerator after a JM failover. Dropping it on close would destroy 
the lease that
+        // the restored enumerator expects to reuse, potentially causing the 
referenced
+        // snapshots to be garbage-collected before tiering finishes. The 
lease will expire

Review Comment:
   The PR description says the enumerator performs a best-effort `dropLease` on 
close, but the implementation explicitly avoids dropping the lease here. Please 
either align the implementation with the description (e.g., drop when it’s 
safe/known not to be a restart) or update the PR description so operators 
understand the actual lifecycle/cleanup behavior.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -533,6 +616,145 @@ public void close() throws IOException {
         }
     }
 
+    /**
+     * Acquire kv snapshot lease for all {@link TieringSnapshotSplit}s of the 
given table so that
+     * snapshots referenced by these splits will not be cleaned up by the 
Fluss server during
+     * tiering. Bucket-snapshot mappings are remembered in {@link 
#leasedBucketsByTable} so they can
+     * be released on finish/fail.
+     *
+     * <p>Falls back to a warning (no exception thrown) when the server does 
not support the kv
+     * snapshot lease API, to preserve compatibility with older Fluss clusters.
+     */
+    private void maybeAcquireKvSnapshotLease(long tableId, List<TieringSplit> 
tieringSplits) {
+        if (flussAdmin == null) {
+            return;
+        }
+        Map<TableBucket, Long> bucketsToLease = new HashMap<>();
+        for (TieringSplit split : tieringSplits) {
+            if (split.isTieringSnapshotSplit()) {
+                TieringSnapshotSplit snapshotSplit = 
split.asTieringSnapshotSplit();
+                bucketsToLease.put(snapshotSplit.getTableBucket(), 
snapshotSplit.getSnapshotId());
+            }
+        }
+        if (bucketsToLease.isEmpty()) {
+            return;
+        }
+        LOG.info(
+                "Try to acquire kv snapshot lease {} for tiering table {} with 
{} buckets.",
+                kvSnapshotLeaseId,
+                tableId,
+                bucketsToLease.size());
+        try {
+            flussAdmin
+                    .createKvSnapshotLease(kvSnapshotLeaseId, 
KV_SNAPSHOT_LEASE_DURATION_MS)
+                    .acquireSnapshots(bucketsToLease)
+                    .get();
+            leasedBucketsByTable
+                    .computeIfAbsent(tableId, k -> 
ConcurrentHashMap.newKeySet())
+                    .addAll(bucketsToLease.keySet());
+        } catch (Exception e) {
+            if (ExceptionUtils.findThrowable(e, 
UnsupportedVersionException.class).isPresent()) {
+                LOG.warn(
+                        "Failed to acquire kv snapshot lease for tiering table 
{} because the "
+                                + "server does not support kv snapshot lease 
API. Snapshots may "
+                                + "be cleaned up earlier than expected. Please 
upgrade the Fluss "
+                                + "server to version 0.9 or later.",
+                        tableId,
+                        e);
+            } else {
+                LOG.error(
+                        "Failed to acquire kv snapshot lease for tiering table 
{}. "
+                                + "Tiering will proceed without snapshot 
protection; the "
+                                + "snapshot may be garbage-collected while 
tiering is in progress.",
+                        tableId,
+                        e);
+            }
+        }
+    }
+
+    /**
+     * Release the kv snapshot lease held for a specific table. Called when a 
table finishes
+     * tiering, fails, or is abandoned due to failover. Missing leases 
(log-only tables, or tables
+     * for which acquire failed) are handled as no-ops.
+     */
+    private void maybeReleaseKvSnapshotLease(long tableId) {
+        Set<TableBucket> buckets = leasedBucketsByTable.remove(tableId);
+        if (flussAdmin == null || buckets == null || buckets.isEmpty()) {
+            return;
+        }

Review Comment:
   `leasedBucketsByTable.remove(tableId)` happens before the RPC release 
succeeds. If `releaseSnapshots(..)` fails transiently, the enumerator loses the 
ability to retry and the server-side snapshots may remain leased until expiry. 
Consider only removing from `leasedBucketsByTable` after a successful release, 
or re-inserting the buckets on failure (similar to FlinkSourceEnumerator 
re-enqueueing buckets on release failure).



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializer.java:
##########
@@ -38,14 +54,41 @@ public int getVersion() {
 
     @Override
     public byte[] serialize(TieringSourceEnumeratorState obj) throws 
IOException {
-        // no need to store anything
-        return new byte[0];
+        final DataOutputSerializer out = SERIALIZER_CACHE.get();
+        String leaseId = obj.getKvSnapshotLeaseId();
+        if (leaseId != null) {
+            out.writeBoolean(true);
+            out.writeUTF(leaseId);
+        } else {
+            out.writeBoolean(false);
+        }
+        final byte[] result = out.getCopyOfBuffer();
+        out.clear();
+        return result;

Review Comment:
   `DataOutputSerializer` is cleared only on the happy path. If `writeUTF` (or 
any future added write) throws, the ThreadLocal serializer keeps a dirty buffer 
and subsequent serializations can become corrupted. Wrap the writes in a 
try/finally and clear the serializer in the finally block.
   



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -533,6 +616,145 @@ public void close() throws IOException {
         }
     }
 
+    /**
+     * Acquire kv snapshot lease for all {@link TieringSnapshotSplit}s of the 
given table so that
+     * snapshots referenced by these splits will not be cleaned up by the 
Fluss server during
+     * tiering. Bucket-snapshot mappings are remembered in {@link 
#leasedBucketsByTable} so they can
+     * be released on finish/fail.
+     *
+     * <p>Falls back to a warning (no exception thrown) when the server does 
not support the kv
+     * snapshot lease API, to preserve compatibility with older Fluss clusters.
+     */
+    private void maybeAcquireKvSnapshotLease(long tableId, List<TieringSplit> 
tieringSplits) {
+        if (flussAdmin == null) {
+            return;
+        }
+        Map<TableBucket, Long> bucketsToLease = new HashMap<>();
+        for (TieringSplit split : tieringSplits) {
+            if (split.isTieringSnapshotSplit()) {
+                TieringSnapshotSplit snapshotSplit = 
split.asTieringSnapshotSplit();
+                bucketsToLease.put(snapshotSplit.getTableBucket(), 
snapshotSplit.getSnapshotId());
+            }
+        }
+        if (bucketsToLease.isEmpty()) {
+            return;
+        }
+        LOG.info(
+                "Try to acquire kv snapshot lease {} for tiering table {} with 
{} buckets.",
+                kvSnapshotLeaseId,
+                tableId,
+                bucketsToLease.size());
+        try {
+            flussAdmin
+                    .createKvSnapshotLease(kvSnapshotLeaseId, 
KV_SNAPSHOT_LEASE_DURATION_MS)
+                    .acquireSnapshots(bucketsToLease)
+                    .get();
+            leasedBucketsByTable

Review Comment:
   `acquireSnapshots(...)` returns an `AcquireKvSnapshotLeaseResult` that can 
report unavailable snapshots (already GC’ed / missing). The current code 
ignores the result and then records *all* requested buckets as leased, which 
can hide partial acquisition and later attempt to tier snapshots that are 
already gone. Please inspect `getUnavailableSnapshots()` and at minimum 
log/metric it and avoid tracking unavailable buckets; ideally fail/retry split 
generation for the table when any requested snapshot is unavailable.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -533,6 +616,145 @@ public void close() throws IOException {
         }
     }
 
+    /**
+     * Acquire kv snapshot lease for all {@link TieringSnapshotSplit}s of the 
given table so that
+     * snapshots referenced by these splits will not be cleaned up by the 
Fluss server during
+     * tiering. Bucket-snapshot mappings are remembered in {@link 
#leasedBucketsByTable} so they can
+     * be released on finish/fail.
+     *
+     * <p>Falls back to a warning (no exception thrown) when the server does 
not support the kv
+     * snapshot lease API, to preserve compatibility with older Fluss clusters.
+     */
+    private void maybeAcquireKvSnapshotLease(long tableId, List<TieringSplit> 
tieringSplits) {
+        if (flussAdmin == null) {
+            return;
+        }
+        Map<TableBucket, Long> bucketsToLease = new HashMap<>();
+        for (TieringSplit split : tieringSplits) {
+            if (split.isTieringSnapshotSplit()) {
+                TieringSnapshotSplit snapshotSplit = 
split.asTieringSnapshotSplit();
+                bucketsToLease.put(snapshotSplit.getTableBucket(), 
snapshotSplit.getSnapshotId());
+            }
+        }
+        if (bucketsToLease.isEmpty()) {
+            return;
+        }
+        LOG.info(
+                "Try to acquire kv snapshot lease {} for tiering table {} with 
{} buckets.",
+                kvSnapshotLeaseId,
+                tableId,
+                bucketsToLease.size());
+        try {
+            flussAdmin
+                    .createKvSnapshotLease(kvSnapshotLeaseId, 
KV_SNAPSHOT_LEASE_DURATION_MS)
+                    .acquireSnapshots(bucketsToLease)
+                    .get();
+            leasedBucketsByTable
+                    .computeIfAbsent(tableId, k -> 
ConcurrentHashMap.newKeySet())
+                    .addAll(bucketsToLease.keySet());
+        } catch (Exception e) {
+            if (ExceptionUtils.findThrowable(e, 
UnsupportedVersionException.class).isPresent()) {
+                LOG.warn(
+                        "Failed to acquire kv snapshot lease for tiering table 
{} because the "
+                                + "server does not support kv snapshot lease 
API. Snapshots may "
+                                + "be cleaned up earlier than expected. Please 
upgrade the Fluss "
+                                + "server to version 0.9 or later.",
+                        tableId,
+                        e);
+            } else {
+                LOG.error(
+                        "Failed to acquire kv snapshot lease for tiering table 
{}. "
+                                + "Tiering will proceed without snapshot 
protection; the "
+                                + "snapshot may be garbage-collected while 
tiering is in progress.",
+                        tableId,
+                        e);
+            }
+        }
+    }
+
+    /**
+     * Release the kv snapshot lease held for a specific table. Called when a 
table finishes
+     * tiering, fails, or is abandoned due to failover. Missing leases 
(log-only tables, or tables
+     * for which acquire failed) are handled as no-ops.
+     */
+    private void maybeReleaseKvSnapshotLease(long tableId) {
+        Set<TableBucket> buckets = leasedBucketsByTable.remove(tableId);
+        if (flussAdmin == null || buckets == null || buckets.isEmpty()) {
+            return;
+        }
+        LOG.info(
+                "Try to release kv snapshot lease {} for tiering table {} with 
{} buckets.",
+                kvSnapshotLeaseId,
+                tableId,
+                buckets.size());
+        try {
+            flussAdmin
+                    .createKvSnapshotLease(kvSnapshotLeaseId, 
KV_SNAPSHOT_LEASE_DURATION_MS)
+                    .releaseSnapshots(buckets)
+                    .get();
+        } catch (Exception e) {
+            if (ExceptionUtils.findThrowable(e, 
UnsupportedVersionException.class).isPresent()) {
+                LOG.warn(
+                        "Failed to release kv snapshot lease for tiering table 
{} because the "
+                                + "server does not support kv snapshot lease 
API.",
+                        tableId,
+                        e);
+            } else {
+                LOG.error("Failed to release kv snapshot lease for tiering 
table {}.", tableId, e);
+            }
+        }
+    }
+
+    /**
+     * Asynchronous variant of {@link #maybeReleaseKvSnapshotLease(long)} used 
during failover to
+     * avoid blocking the coordinator thread when multiple tables need to be 
released.
+     */
+    private void maybeReleaseKvSnapshotLeaseAsync(long tableId) {
+        Set<TableBucket> buckets = leasedBucketsByTable.remove(tableId);
+        if (flussAdmin == null || buckets == null || buckets.isEmpty()) {
+            return;
+        }

Review Comment:
   Same as the sync path: this async release removes the table entry from 
`leasedBucketsByTable` before the release completes. If the future completes 
exceptionally, there is no way to retry later, so the lease can remain held 
until expiry. Consider keeping the buckets tracked until completion, and 
restoring them on failure to allow later retries/cleanup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to