Copilot commented on code in PR #3286:
URL: https://github.com/apache/fluss/pull/3286#discussion_r3212376980
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -515,6 +586,18 @@ public void close() throws IOException {
LOG.error("Failed to close Tiering Source enumerator.", e);
}
}
+ // NOTE: we intentionally do NOT drop the kv snapshot lease here. The
lease id is
+ // persisted into the enumerator checkpoint state and will be reused
by the restored
+ // enumerator after a JM failover. Dropping it on close would destroy
the lease that
+ // the restored enumerator expects to reuse, potentially causing the
referenced
+ // snapshots to be garbage-collected before tiering finishes. The
lease will expire
Review Comment:
The PR description says the enumerator performs a best-effort `dropLease` on
close, but the implementation explicitly avoids dropping the lease here. Please
either align the implementation with the description (e.g., drop when it’s
safe/known not to be a restart) or update the PR description so operators
understand the actual lifecycle/cleanup behavior.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -533,6 +616,145 @@ public void close() throws IOException {
}
}
+ /**
+ * Acquire kv snapshot lease for all {@link TieringSnapshotSplit}s of the
given table so that
+ * snapshots referenced by these splits will not be cleaned up by the
Fluss server during
+ * tiering. Bucket-snapshot mappings are remembered in {@link
#leasedBucketsByTable} so they can
+ * be released on finish/fail.
+ *
+ * <p>Falls back to a warning (no exception thrown) when the server does
not support the kv
+ * snapshot lease API, to preserve compatibility with older Fluss clusters.
+ */
+ private void maybeAcquireKvSnapshotLease(long tableId, List<TieringSplit>
tieringSplits) {
+ if (flussAdmin == null) {
+ return;
+ }
+ Map<TableBucket, Long> bucketsToLease = new HashMap<>();
+ for (TieringSplit split : tieringSplits) {
+ if (split.isTieringSnapshotSplit()) {
+ TieringSnapshotSplit snapshotSplit =
split.asTieringSnapshotSplit();
+ bucketsToLease.put(snapshotSplit.getTableBucket(),
snapshotSplit.getSnapshotId());
+ }
+ }
+ if (bucketsToLease.isEmpty()) {
+ return;
+ }
+ LOG.info(
+ "Try to acquire kv snapshot lease {} for tiering table {} with
{} buckets.",
+ kvSnapshotLeaseId,
+ tableId,
+ bucketsToLease.size());
+ try {
+ flussAdmin
+ .createKvSnapshotLease(kvSnapshotLeaseId,
KV_SNAPSHOT_LEASE_DURATION_MS)
+ .acquireSnapshots(bucketsToLease)
+ .get();
+ leasedBucketsByTable
+ .computeIfAbsent(tableId, k ->
ConcurrentHashMap.newKeySet())
+ .addAll(bucketsToLease.keySet());
+ } catch (Exception e) {
+ if (ExceptionUtils.findThrowable(e,
UnsupportedVersionException.class).isPresent()) {
+ LOG.warn(
+ "Failed to acquire kv snapshot lease for tiering table
{} because the "
+ + "server does not support kv snapshot lease
API. Snapshots may "
+ + "be cleaned up earlier than expected. Please
upgrade the Fluss "
+ + "server to version 0.9 or later.",
+ tableId,
+ e);
+ } else {
+ LOG.error(
+ "Failed to acquire kv snapshot lease for tiering table
{}. "
+ + "Tiering will proceed without snapshot
protection; the "
+ + "snapshot may be garbage-collected while
tiering is in progress.",
+ tableId,
+ e);
+ }
+ }
+ }
+
+ /**
+ * Release the kv snapshot lease held for a specific table. Called when a
table finishes
+ * tiering, fails, or is abandoned due to failover. Missing leases
(log-only tables, or tables
+ * for which acquire failed) are handled as no-ops.
+ */
+ private void maybeReleaseKvSnapshotLease(long tableId) {
+ Set<TableBucket> buckets = leasedBucketsByTable.remove(tableId);
+ if (flussAdmin == null || buckets == null || buckets.isEmpty()) {
+ return;
+ }
Review Comment:
`leasedBucketsByTable.remove(tableId)` happens before the RPC release
succeeds. If `releaseSnapshots(..)` fails transiently, the enumerator loses the
ability to retry and the server-side snapshots may remain leased until expiry.
Consider only removing from `leasedBucketsByTable` after a successful release,
or re-inserting the buckets on failure (similar to FlinkSourceEnumerator
re-enqueueing buckets on release failure).
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorStateSerializer.java:
##########
@@ -38,14 +54,41 @@ public int getVersion() {
@Override
public byte[] serialize(TieringSourceEnumeratorState obj) throws
IOException {
- // no need to store anything
- return new byte[0];
+ final DataOutputSerializer out = SERIALIZER_CACHE.get();
+ String leaseId = obj.getKvSnapshotLeaseId();
+ if (leaseId != null) {
+ out.writeBoolean(true);
+ out.writeUTF(leaseId);
+ } else {
+ out.writeBoolean(false);
+ }
+ final byte[] result = out.getCopyOfBuffer();
+ out.clear();
+ return result;
Review Comment:
`DataOutputSerializer` is cleared only on the happy path. If `writeUTF` (or
any future added write) throws, the ThreadLocal serializer keeps a dirty buffer
and subsequent serializations can become corrupted. Wrap the writes in a
try/finally and clear the serializer in the finally block.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -533,6 +616,145 @@ public void close() throws IOException {
}
}
+ /**
+ * Acquire kv snapshot lease for all {@link TieringSnapshotSplit}s of the
given table so that
+ * snapshots referenced by these splits will not be cleaned up by the
Fluss server during
+ * tiering. Bucket-snapshot mappings are remembered in {@link
#leasedBucketsByTable} so they can
+ * be released on finish/fail.
+ *
+ * <p>Falls back to a warning (no exception thrown) when the server does
not support the kv
+ * snapshot lease API, to preserve compatibility with older Fluss clusters.
+ */
+ private void maybeAcquireKvSnapshotLease(long tableId, List<TieringSplit>
tieringSplits) {
+ if (flussAdmin == null) {
+ return;
+ }
+ Map<TableBucket, Long> bucketsToLease = new HashMap<>();
+ for (TieringSplit split : tieringSplits) {
+ if (split.isTieringSnapshotSplit()) {
+ TieringSnapshotSplit snapshotSplit =
split.asTieringSnapshotSplit();
+ bucketsToLease.put(snapshotSplit.getTableBucket(),
snapshotSplit.getSnapshotId());
+ }
+ }
+ if (bucketsToLease.isEmpty()) {
+ return;
+ }
+ LOG.info(
+ "Try to acquire kv snapshot lease {} for tiering table {} with
{} buckets.",
+ kvSnapshotLeaseId,
+ tableId,
+ bucketsToLease.size());
+ try {
+ flussAdmin
+ .createKvSnapshotLease(kvSnapshotLeaseId,
KV_SNAPSHOT_LEASE_DURATION_MS)
+ .acquireSnapshots(bucketsToLease)
+ .get();
+ leasedBucketsByTable
Review Comment:
`acquireSnapshots(...)` returns an `AcquireKvSnapshotLeaseResult` that can
report unavailable snapshots (already GC’ed / missing). The current code
ignores the result and then records *all* requested buckets as leased, which
can hide partial acquisition and later attempt to tier snapshots that are
already gone. Please inspect `getUnavailableSnapshots()` and at minimum
log/metric it and avoid tracking unavailable buckets; ideally fail/retry split
generation for the table when any requested snapshot is unavailable.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -533,6 +616,145 @@ public void close() throws IOException {
}
}
+ /**
+ * Acquire kv snapshot lease for all {@link TieringSnapshotSplit}s of the
given table so that
+ * snapshots referenced by these splits will not be cleaned up by the
Fluss server during
+ * tiering. Bucket-snapshot mappings are remembered in {@link
#leasedBucketsByTable} so they can
+ * be released on finish/fail.
+ *
+ * <p>Falls back to a warning (no exception thrown) when the server does
not support the kv
+ * snapshot lease API, to preserve compatibility with older Fluss clusters.
+ */
+ private void maybeAcquireKvSnapshotLease(long tableId, List<TieringSplit>
tieringSplits) {
+ if (flussAdmin == null) {
+ return;
+ }
+ Map<TableBucket, Long> bucketsToLease = new HashMap<>();
+ for (TieringSplit split : tieringSplits) {
+ if (split.isTieringSnapshotSplit()) {
+ TieringSnapshotSplit snapshotSplit =
split.asTieringSnapshotSplit();
+ bucketsToLease.put(snapshotSplit.getTableBucket(),
snapshotSplit.getSnapshotId());
+ }
+ }
+ if (bucketsToLease.isEmpty()) {
+ return;
+ }
+ LOG.info(
+ "Try to acquire kv snapshot lease {} for tiering table {} with
{} buckets.",
+ kvSnapshotLeaseId,
+ tableId,
+ bucketsToLease.size());
+ try {
+ flussAdmin
+ .createKvSnapshotLease(kvSnapshotLeaseId,
KV_SNAPSHOT_LEASE_DURATION_MS)
+ .acquireSnapshots(bucketsToLease)
+ .get();
+ leasedBucketsByTable
+ .computeIfAbsent(tableId, k ->
ConcurrentHashMap.newKeySet())
+ .addAll(bucketsToLease.keySet());
+ } catch (Exception e) {
+ if (ExceptionUtils.findThrowable(e,
UnsupportedVersionException.class).isPresent()) {
+ LOG.warn(
+ "Failed to acquire kv snapshot lease for tiering table
{} because the "
+ + "server does not support kv snapshot lease
API. Snapshots may "
+ + "be cleaned up earlier than expected. Please
upgrade the Fluss "
+ + "server to version 0.9 or later.",
+ tableId,
+ e);
+ } else {
+ LOG.error(
+ "Failed to acquire kv snapshot lease for tiering table
{}. "
+ + "Tiering will proceed without snapshot
protection; the "
+ + "snapshot may be garbage-collected while
tiering is in progress.",
+ tableId,
+ e);
+ }
+ }
+ }
+
+ /**
+ * Release the kv snapshot lease held for a specific table. Called when a
table finishes
+ * tiering, fails, or is abandoned due to failover. Missing leases
(log-only tables, or tables
+ * for which acquire failed) are handled as no-ops.
+ */
+ private void maybeReleaseKvSnapshotLease(long tableId) {
+ Set<TableBucket> buckets = leasedBucketsByTable.remove(tableId);
+ if (flussAdmin == null || buckets == null || buckets.isEmpty()) {
+ return;
+ }
+ LOG.info(
+ "Try to release kv snapshot lease {} for tiering table {} with
{} buckets.",
+ kvSnapshotLeaseId,
+ tableId,
+ buckets.size());
+ try {
+ flussAdmin
+ .createKvSnapshotLease(kvSnapshotLeaseId,
KV_SNAPSHOT_LEASE_DURATION_MS)
+ .releaseSnapshots(buckets)
+ .get();
+ } catch (Exception e) {
+ if (ExceptionUtils.findThrowable(e,
UnsupportedVersionException.class).isPresent()) {
+ LOG.warn(
+ "Failed to release kv snapshot lease for tiering table
{} because the "
+ + "server does not support kv snapshot lease
API.",
+ tableId,
+ e);
+ } else {
+ LOG.error("Failed to release kv snapshot lease for tiering
table {}.", tableId, e);
+ }
+ }
+ }
+
+ /**
+ * Asynchronous variant of {@link #maybeReleaseKvSnapshotLease(long)} used
during failover to
+ * avoid blocking the coordinator thread when multiple tables need to be
released.
+ */
+ private void maybeReleaseKvSnapshotLeaseAsync(long tableId) {
+ Set<TableBucket> buckets = leasedBucketsByTable.remove(tableId);
+ if (flussAdmin == null || buckets == null || buckets.isEmpty()) {
+ return;
+ }
Review Comment:
Same as the sync path: this async release removes the table entry from
`leasedBucketsByTable` before the release completes. If the future completes
exceptionally, there is no way to retry later, so the lease can remain held
until expiry. Consider keeping the buckets tracked until completion, and
restoring them on failure to allow later retries/cleanup.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]