luoyuxia commented on code in PR #3286:
URL: https://github.com/apache/fluss/pull/3286#discussion_r3212825081
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -245,11 +307,23 @@ public void addReader(int subtaskId) {
int globalMaxAttempt = max(maxAttempts);
if (maxAttempts.size() == 1 && globalMaxAttempt >= 1) {
LOG.info(
- "Failover completed. All {} subtasks reached
the same attempt number {}. Current registered readers are {}",
+ "All {} subtasks reached the same attempt
number {}. Current registered readers are {}. Waiting for failed-table report
to complete before clearing failover state.",
Review Comment:
why change this failover logic?
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -313,10 +391,18 @@ private void handleSourceReaderFailOver() {
tieringTableEpochs);
// we need to make all as failed
failedTableEpochs.putAll(new HashMap<>(tieringTableEpochs));
+ // release all currently leased buckets for tables that are being
marked failed;
+ // take a snapshot of the keys first to avoid concurrent modification.
+ Set<Long> tableIdsToRelease = new
HashSet<>(tieringTableEpochs.keySet());
tieringTableEpochs.clear();
tieringReachMaxDurationsTables.clear();
// also clean all pending splits since we mark all as failed
pendingSplits.clear();
+ // Release leases asynchronously to avoid blocking the coordinator
thread when
+ // multiple tables are involved and RPC calls may time out.
+ for (Long tableId : tableIdsToRelease) {
Review Comment:
can we just release all in `leasedBucketsByTable`? So that we won't need to
collect `tableIdsToRelease`.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -96,6 +102,19 @@ public class TieringSourceEnumerator
private static final Logger LOG =
LoggerFactory.getLogger(TieringSourceEnumerator.class);
+ /**
+ * KV snapshot lease duration for the whole tiering job. One lease covers
the entire job
+ * lifecycle; it is renewed implicitly by every {@code acquireSnapshots}
call, so a relatively
+ * long duration is safe and also bounds the worst-case leaked-lease
lifetime if the job dies
+ * abnormally.
+ *
+ * <p>TODO: introduce an explicit periodic lease-renewal mechanism so that
a single tiering
+ * round that exceeds {@link #KV_SNAPSHOT_LEASE_DURATION_MS} (e.g. for
very large tables) will
Review Comment:
nit: Javadoc pointing to itself
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java:
##########
@@ -696,13 +705,25 @@ private void waitUntilTieringTableSplitAssignmentReady(
int expectedSplitsNum,
long sleepMs)
throws Throwable {
+ long startTime = System.currentTimeMillis();
+ long timeoutMs = 10000; // 10秒超时
+
while (context.getSplitsAssignmentSequence().size() <
expectedSplitsNum) {
+ if (System.currentTimeMillis() - startTime > timeoutMs) {
+ throw new AssertionError(
+ String.format(
+ "等待分配超时: 期望 %d 个分配, 实际 %d 个分配, 超时时间 %dms",
Review Comment:
why change this?
Also, please use engligh
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -184,6 +233,19 @@ public void handleSplitRequest(int subtaskId, @Nullable
String requesterHostname
LOG.info("TieringSourceReader {} requests split.", subtaskId);
readersAwaitingSplit.add(subtaskId);
+ // During failover we must not request a new tiering table or assign
any splits.
+ // Otherwise we might re-acquire leases for a table whose leases were
just released by
+ // handleSourceReaderFailOver, or assign splits to subtasks whose
readers from the new
+ // attempt have not been fully registered yet. The pending
failed-table report and the
+ // subsequent split request will be driven by the periodic callAsync
once failover is
+ // marked complete.
+ if (isFailOvering) {
Review Comment:
why change this failover logic?
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -533,6 +641,208 @@ public void close() throws IOException {
}
}
+ /**
+ * Acquire kv snapshot lease for all {@link TieringSnapshotSplit}s of the
given table so that
+ * snapshots referenced by these splits will not be cleaned up by the
Fluss server during
+ * tiering. Bucket-snapshot mappings are remembered in {@link
#leasedBucketsByTable} so they can
+ * be released on finish/fail.
+ *
+ * <p>Falls back to a warning (no exception thrown) when the server does
not support the kv
+ * snapshot lease API, to preserve compatibility with older Fluss clusters.
+ */
+ private void maybeAcquireKvSnapshotLease(long tableId, List<TieringSplit>
tieringSplits) {
+ if (flussAdmin == null) {
+ return;
+ }
+ Map<TableBucket, Long> bucketsToLease = new HashMap<>();
+ for (TieringSplit split : tieringSplits) {
+ if (split.isTieringSnapshotSplit()) {
+ TieringSnapshotSplit snapshotSplit =
split.asTieringSnapshotSplit();
+ bucketsToLease.put(snapshotSplit.getTableBucket(),
snapshotSplit.getSnapshotId());
+ }
+ }
+ if (bucketsToLease.isEmpty()) {
+ return;
+ }
+ LOG.info(
+ "Try to acquire kv snapshot lease {} for tiering table {} with
{} buckets.",
+ kvSnapshotLeaseId,
+ tableId,
+ bucketsToLease.size());
+ try {
+ Set<TableBucket> unavailableBuckets =
+ flussAdmin
+ .createKvSnapshotLease(kvSnapshotLeaseId,
KV_SNAPSHOT_LEASE_DURATION_MS)
+ .acquireSnapshots(bucketsToLease)
+ .get()
+ .getUnavailableTableBucketSet();
+ // Only record successfully leased buckets so we don't later try
to release
+ // buckets that were never actually acquired (e.g. snapshots
already GC'ed or
+ // missing on the server).
+ Set<TableBucket> acquiredBuckets = new
HashSet<>(bucketsToLease.keySet());
+ if (!unavailableBuckets.isEmpty()) {
+ LOG.warn(
+ "Failed to acquire kv snapshot lease for {} of {}
buckets of tiering "
+ + "table {}: {}. The corresponding snapshots
may have already "
+ + "been garbage-collected; tiering for those
buckets may fail "
+ + "later when the snapshots are accessed.",
+ unavailableBuckets.size(),
+ bucketsToLease.size(),
+ tableId,
+ unavailableBuckets);
+ acquiredBuckets.removeAll(unavailableBuckets);
+ }
+ if (!acquiredBuckets.isEmpty()) {
+ leasedBucketsByTable
+ .computeIfAbsent(tableId, k ->
ConcurrentHashMap.newKeySet())
+ .addAll(acquiredBuckets);
+ }
+ } catch (Exception e) {
+ if (ExceptionUtils.findThrowable(e,
UnsupportedVersionException.class).isPresent()) {
+ LOG.warn(
+ "Failed to acquire kv snapshot lease for tiering table
{} because the "
+ + "server does not support kv snapshot lease
API. Snapshots may "
+ + "be cleaned up earlier than expected. Please
upgrade the Fluss "
+ + "server to version 0.9 or later.",
+ tableId,
+ e);
+ } else {
+ LOG.error(
+ "Failed to acquire kv snapshot lease for tiering table
{}. "
+ + "Tiering will proceed without snapshot
protection; the "
+ + "snapshot may be garbage-collected while
tiering is in progress.",
+ tableId,
+ e);
+ }
+ }
+ }
+
+ /**
+ * Release the kv snapshot lease held for a specific table. Called when a
table finishes
+ * tiering, fails, or is abandoned due to failover. Missing leases
(log-only tables, or tables
+ * for which acquire failed) are handled as no-ops.
+ */
+ private void maybeReleaseKvSnapshotLease(long tableId) {
+ // Peek the buckets without removing so that, if the release RPC
fails, we can keep
+ // the entry in leasedBucketsByTable for a later retry instead of
permanently
+ // forgetting which buckets are still leased on the server side.
+ Set<TableBucket> buckets = leasedBucketsByTable.get(tableId);
+ if (flussAdmin == null || buckets == null || buckets.isEmpty()) {
+ // Nothing to release; clean up any empty entry.
+ leasedBucketsByTable.remove(tableId);
+ return;
+ }
+ LOG.info(
+ "Try to release kv snapshot lease {} for tiering table {} with
{} buckets.",
+ kvSnapshotLeaseId,
+ tableId,
+ buckets.size());
+ // Take a defensive copy of the buckets to release so concurrent
updates to the
+ // tracked set do not affect the in-flight RPC payload.
+ Set<TableBucket> bucketsToRelease = new HashSet<>(buckets);
+ try {
+ flussAdmin
+ .createKvSnapshotLease(kvSnapshotLeaseId,
KV_SNAPSHOT_LEASE_DURATION_MS)
+ .releaseSnapshots(bucketsToRelease)
+ .get();
+ // Only drop the bookkeeping entry after the server confirms the
release.
+ Set<TableBucket> tracked = leasedBucketsByTable.get(tableId);
+ if (tracked != null) {
+ tracked.removeAll(bucketsToRelease);
+ if (tracked.isEmpty()) {
+ leasedBucketsByTable.remove(tableId);
+ }
+ }
+ } catch (Exception e) {
+ if (ExceptionUtils.findThrowable(e,
UnsupportedVersionException.class).isPresent()) {
+ // Server does not support the lease API; drop tracking since
release is a
+ // no-op and there is no point retrying.
+ leasedBucketsByTable.remove(tableId);
+ LOG.warn(
+ "Failed to release kv snapshot lease for tiering table
{} because the "
+ + "server does not support kv snapshot lease
API.",
+ tableId,
+ e);
+ } else {
+ // Keep the buckets tracked so we (or the next failover/close)
can retry.
+ LOG.error(
+ "Failed to release kv snapshot lease for tiering table
{}; the buckets "
+ + "remain tracked and will be retried on next
release attempt.",
+ tableId,
+ e);
+ }
+ }
+ }
+
+ /**
+ * Asynchronous variant of {@link #maybeReleaseKvSnapshotLease(long)} used
during failover to
+ * avoid blocking the coordinator thread when multiple tables need to be
released.
+ */
+ private void maybeReleaseKvSnapshotLeaseAsync(long tableId) {
Review Comment:
maybeReleaseKvSnapshotLeaseAsync is most same to
maybeReleaseKvSnapshotLease, can we just keep one single method?
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -533,6 +641,208 @@ public void close() throws IOException {
}
}
+ /**
+ * Acquire kv snapshot lease for all {@link TieringSnapshotSplit}s of the
given table so that
+ * snapshots referenced by these splits will not be cleaned up by the
Fluss server during
+ * tiering. Bucket-snapshot mappings are remembered in {@link
#leasedBucketsByTable} so they can
+ * be released on finish/fail.
+ *
+ * <p>Falls back to a warning (no exception thrown) when the server does
not support the kv
+ * snapshot lease API, to preserve compatibility with older Fluss clusters.
+ */
+ private void maybeAcquireKvSnapshotLease(long tableId, List<TieringSplit>
tieringSplits) {
+ if (flussAdmin == null) {
+ return;
+ }
+ Map<TableBucket, Long> bucketsToLease = new HashMap<>();
+ for (TieringSplit split : tieringSplits) {
+ if (split.isTieringSnapshotSplit()) {
+ TieringSnapshotSplit snapshotSplit =
split.asTieringSnapshotSplit();
+ bucketsToLease.put(snapshotSplit.getTableBucket(),
snapshotSplit.getSnapshotId());
+ }
+ }
+ if (bucketsToLease.isEmpty()) {
+ return;
+ }
+ LOG.info(
+ "Try to acquire kv snapshot lease {} for tiering table {} with
{} buckets.",
+ kvSnapshotLeaseId,
+ tableId,
+ bucketsToLease.size());
+ try {
+ Set<TableBucket> unavailableBuckets =
+ flussAdmin
+ .createKvSnapshotLease(kvSnapshotLeaseId,
KV_SNAPSHOT_LEASE_DURATION_MS)
+ .acquireSnapshots(bucketsToLease)
+ .get()
+ .getUnavailableTableBucketSet();
+ // Only record successfully leased buckets so we don't later try
to release
+ // buckets that were never actually acquired (e.g. snapshots
already GC'ed or
+ // missing on the server).
+ Set<TableBucket> acquiredBuckets = new
HashSet<>(bucketsToLease.keySet());
+ if (!unavailableBuckets.isEmpty()) {
+ LOG.warn(
+ "Failed to acquire kv snapshot lease for {} of {}
buckets of tiering "
+ + "table {}: {}. The corresponding snapshots
may have already "
+ + "been garbage-collected; tiering for those
buckets may fail "
+ + "later when the snapshots are accessed.",
+ unavailableBuckets.size(),
+ bucketsToLease.size(),
+ tableId,
+ unavailableBuckets);
+ acquiredBuckets.removeAll(unavailableBuckets);
+ }
+ if (!acquiredBuckets.isEmpty()) {
+ leasedBucketsByTable
+ .computeIfAbsent(tableId, k ->
ConcurrentHashMap.newKeySet())
+ .addAll(acquiredBuckets);
+ }
+ } catch (Exception e) {
+ if (ExceptionUtils.findThrowable(e,
UnsupportedVersionException.class).isPresent()) {
+ LOG.warn(
+ "Failed to acquire kv snapshot lease for tiering table
{} because the "
+ + "server does not support kv snapshot lease
API. Snapshots may "
+ + "be cleaned up earlier than expected. Please
upgrade the Fluss "
+ + "server to version 0.9 or later.",
+ tableId,
+ e);
+ } else {
+ LOG.error(
+ "Failed to acquire kv snapshot lease for tiering table
{}. "
+ + "Tiering will proceed without snapshot
protection; the "
+ + "snapshot may be garbage-collected while
tiering is in progress.",
+ tableId,
+ e);
+ }
+ }
+ }
+
+ /**
+ * Release the kv snapshot lease held for a specific table. Called when a
table finishes
+ * tiering, fails, or is abandoned due to failover. Missing leases
(log-only tables, or tables
+ * for which acquire failed) are handled as no-ops.
+ */
+ private void maybeReleaseKvSnapshotLease(long tableId) {
+ // Peek the buckets without removing so that, if the release RPC
fails, we can keep
+ // the entry in leasedBucketsByTable for a later retry instead of
permanently
+ // forgetting which buckets are still leased on the server side.
+ Set<TableBucket> buckets = leasedBucketsByTable.get(tableId);
+ if (flussAdmin == null || buckets == null || buckets.isEmpty()) {
+ // Nothing to release; clean up any empty entry.
+ leasedBucketsByTable.remove(tableId);
+ return;
+ }
+ LOG.info(
+ "Try to release kv snapshot lease {} for tiering table {} with
{} buckets.",
+ kvSnapshotLeaseId,
+ tableId,
+ buckets.size());
+ // Take a defensive copy of the buckets to release so concurrent
updates to the
+ // tracked set do not affect the in-flight RPC payload.
+ Set<TableBucket> bucketsToRelease = new HashSet<>(buckets);
+ try {
+ flussAdmin
+ .createKvSnapshotLease(kvSnapshotLeaseId,
KV_SNAPSHOT_LEASE_DURATION_MS)
+ .releaseSnapshots(bucketsToRelease)
+ .get();
+ // Only drop the bookkeeping entry after the server confirms the
release.
+ Set<TableBucket> tracked = leasedBucketsByTable.get(tableId);
+ if (tracked != null) {
+ tracked.removeAll(bucketsToRelease);
+ if (tracked.isEmpty()) {
+ leasedBucketsByTable.remove(tableId);
+ }
+ }
+ } catch (Exception e) {
+ if (ExceptionUtils.findThrowable(e,
UnsupportedVersionException.class).isPresent()) {
+ // Server does not support the lease API; drop tracking since
release is a
+ // no-op and there is no point retrying.
+ leasedBucketsByTable.remove(tableId);
+ LOG.warn(
+ "Failed to release kv snapshot lease for tiering table
{} because the "
+ + "server does not support kv snapshot lease
API.",
+ tableId,
+ e);
+ } else {
+ // Keep the buckets tracked so we (or the next failover/close)
can retry.
Review Comment:
when will we do the retry?
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -96,6 +102,19 @@ public class TieringSourceEnumerator
private static final Logger LOG =
LoggerFactory.getLogger(TieringSourceEnumerator.class);
+ /**
+ * KV snapshot lease duration for the whole tiering job. One lease covers
the entire job
+ * lifecycle; it is renewed implicitly by every {@code acquireSnapshots}
call, so a relatively
+ * long duration is safe and also bounds the worst-case leaked-lease
lifetime if the job dies
+ * abnormally.
+ *
+ * <p>TODO: introduce an explicit periodic lease-renewal mechanism so that
a single tiering
+ * round that exceeds {@link #KV_SNAPSHOT_LEASE_DURATION_MS} (e.g. for
very large tables) will
+ * not see its snapshots garbage-collected mid-flight. Tracked as a
follow-up issue; tiering
+ * rounds are typically minute-level today so a 1-day lease is sufficient
in practice.
+ */
+ private static final long KV_SNAPSHOT_LEASE_DURATION_MS =
Duration.ofDays(1).toMillis();
Review Comment:
change to `Duration.ofHours(6)`? I think 6 hource should be fine for most of
case.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSourceEnumeratorState.java:
##########
@@ -19,23 +19,55 @@
import
org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator;
-/** The marker class of stateless component {@link TieringSourceEnumerator}. */
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/**
+ * The state of the {@link TieringSourceEnumerator}. Stores the KV snapshot
lease id so that it can
+ * be recovered after a checkpoint restore, avoiding orphaned leases on the
server side.
+ */
public class TieringSourceEnumeratorState {
Review Comment:
It will breaks the statless design of tiering service. I think we can just
ignore the kvSnapshotLeaseId and delegate fluss cluster to do snapshot ttl.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -533,6 +641,208 @@ public void close() throws IOException {
}
}
+ /**
+ * Acquire kv snapshot lease for all {@link TieringSnapshotSplit}s of the
given table so that
+ * snapshots referenced by these splits will not be cleaned up by the
Fluss server during
+ * tiering. Bucket-snapshot mappings are remembered in {@link
#leasedBucketsByTable} so they can
+ * be released on finish/fail.
+ *
+ * <p>Falls back to a warning (no exception thrown) when the server does
not support the kv
+ * snapshot lease API, to preserve compatibility with older Fluss clusters.
+ */
+ private void maybeAcquireKvSnapshotLease(long tableId, List<TieringSplit>
tieringSplits) {
+ if (flussAdmin == null) {
Review Comment:
when flussAdmin will be null?
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -533,6 +641,208 @@ public void close() throws IOException {
}
}
+ /**
+ * Acquire kv snapshot lease for all {@link TieringSnapshotSplit}s of the
given table so that
+ * snapshots referenced by these splits will not be cleaned up by the
Fluss server during
+ * tiering. Bucket-snapshot mappings are remembered in {@link
#leasedBucketsByTable} so they can
+ * be released on finish/fail.
+ *
+ * <p>Falls back to a warning (no exception thrown) when the server does
not support the kv
+ * snapshot lease API, to preserve compatibility with older Fluss clusters.
+ */
+ private void maybeAcquireKvSnapshotLease(long tableId, List<TieringSplit>
tieringSplits) {
+ if (flussAdmin == null) {
+ return;
+ }
+ Map<TableBucket, Long> bucketsToLease = new HashMap<>();
+ for (TieringSplit split : tieringSplits) {
+ if (split.isTieringSnapshotSplit()) {
+ TieringSnapshotSplit snapshotSplit =
split.asTieringSnapshotSplit();
+ bucketsToLease.put(snapshotSplit.getTableBucket(),
snapshotSplit.getSnapshotId());
+ }
+ }
+ if (bucketsToLease.isEmpty()) {
+ return;
+ }
+ LOG.info(
+ "Try to acquire kv snapshot lease {} for tiering table {} with
{} buckets.",
+ kvSnapshotLeaseId,
+ tableId,
+ bucketsToLease.size());
+ try {
+ Set<TableBucket> unavailableBuckets =
+ flussAdmin
+ .createKvSnapshotLease(kvSnapshotLeaseId,
KV_SNAPSHOT_LEASE_DURATION_MS)
+ .acquireSnapshots(bucketsToLease)
+ .get()
+ .getUnavailableTableBucketSet();
+ // Only record successfully leased buckets so we don't later try
to release
+ // buckets that were never actually acquired (e.g. snapshots
already GC'ed or
+ // missing on the server).
+ Set<TableBucket> acquiredBuckets = new
HashSet<>(bucketsToLease.keySet());
+ if (!unavailableBuckets.isEmpty()) {
+ LOG.warn(
+ "Failed to acquire kv snapshot lease for {} of {}
buckets of tiering "
+ + "table {}: {}. The corresponding snapshots
may have already "
+ + "been garbage-collected; tiering for those
buckets may fail "
+ + "later when the snapshots are accessed.",
+ unavailableBuckets.size(),
+ bucketsToLease.size(),
+ tableId,
+ unavailableBuckets);
+ acquiredBuckets.removeAll(unavailableBuckets);
+ }
+ if (!acquiredBuckets.isEmpty()) {
+ leasedBucketsByTable
+ .computeIfAbsent(tableId, k ->
ConcurrentHashMap.newKeySet())
+ .addAll(acquiredBuckets);
+ }
+ } catch (Exception e) {
+ if (ExceptionUtils.findThrowable(e,
UnsupportedVersionException.class).isPresent()) {
+ LOG.warn(
+ "Failed to acquire kv snapshot lease for tiering table
{} because the "
+ + "server does not support kv snapshot lease
API. Snapshots may "
+ + "be cleaned up earlier than expected. Please
upgrade the Fluss "
+ + "server to version 0.9 or later.",
+ tableId,
+ e);
+ } else {
+ LOG.error(
+ "Failed to acquire kv snapshot lease for tiering table
{}. "
+ + "Tiering will proceed without snapshot
protection; the "
+ + "snapshot may be garbage-collected while
tiering is in progress.",
+ tableId,
+ e);
+ }
+ }
+ }
+
+ /**
+ * Release the kv snapshot lease held for a specific table. Called when a
table finishes
+ * tiering, fails, or is abandoned due to failover. Missing leases
(log-only tables, or tables
+ * for which acquire failed) are handled as no-ops.
+ */
+ private void maybeReleaseKvSnapshotLease(long tableId) {
+ // Peek the buckets without removing so that, if the release RPC
fails, we can keep
+ // the entry in leasedBucketsByTable for a later retry instead of
permanently
+ // forgetting which buckets are still leased on the server side.
+ Set<TableBucket> buckets = leasedBucketsByTable.get(tableId);
+ if (flussAdmin == null || buckets == null || buckets.isEmpty()) {
+ // Nothing to release; clean up any empty entry.
+ leasedBucketsByTable.remove(tableId);
+ return;
+ }
+ LOG.info(
+ "Try to release kv snapshot lease {} for tiering table {} with
{} buckets.",
+ kvSnapshotLeaseId,
+ tableId,
+ buckets.size());
+ // Take a defensive copy of the buckets to release so concurrent
updates to the
+ // tracked set do not affect the in-flight RPC payload.
+ Set<TableBucket> bucketsToRelease = new HashSet<>(buckets);
+ try {
+ flussAdmin
+ .createKvSnapshotLease(kvSnapshotLeaseId,
KV_SNAPSHOT_LEASE_DURATION_MS)
+ .releaseSnapshots(bucketsToRelease)
+ .get();
+ // Only drop the bookkeeping entry after the server confirms the
release.
+ Set<TableBucket> tracked = leasedBucketsByTable.get(tableId);
Review Comment:
can we just call `leasedBucketsByTable.remove(tableId);`?
IIUC, `bucketsToRelease` is same with the buckets in the
`leasedBucketsByTable`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]