This is an automated email from the ASF dual-hosted git repository.
yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 575db6368 [hotfix][flink] Fix acquire kv snapshot lease cannot work
issue for legacy clusters (#2714)
575db6368 is described below
commit 575db63687793be0086c63f6793bce683e1281f0
Author: yunhong <[email protected]>
AuthorDate: Wed Feb 25 16:39:36 2026 +0800
[hotfix][flink] Fix acquire kv snapshot lease cannot work issue for legacy
clusters (#2714)
---
.../source/enumerator/FlinkSourceEnumerator.java | 83 ++++--
...linkSourceEnumeratorUnsupportedVersionTest.java | 331 +++++++++++++++++++++
2 files changed, 393 insertions(+), 21 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index 0b1f549ab..e4e800c67 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -28,6 +28,7 @@ import
org.apache.fluss.client.initializer.SnapshotOffsetsInitializer;
import org.apache.fluss.client.metadata.KvSnapshots;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.UnsupportedVersionException;
import org.apache.fluss.flink.lake.LakeSplitGenerator;
import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
@@ -607,17 +608,33 @@ public class FlinkSourceEnumerator
kvSnapshotLeaseId,
PhysicalTablePath.of(tablePath, partitionName));
long kvSnapshotLeaseDurationMs =
leaseContext.getKvSnapshotLeaseDurationMs();
- Set<TableBucket> unavailableTableBucketSet =
- flussAdmin
- .createKvSnapshotLease(kvSnapshotLeaseId,
kvSnapshotLeaseDurationMs)
- .acquireSnapshots(bucketsToLease)
- .get()
- .getUnavailableTableBucketSet();
- if (!unavailableTableBucketSet.isEmpty()) {
- LOG.error(
- "Failed to acquire kv snapshot lease for table {}:
{}.",
- tablePath,
- unavailableTableBucketSet);
+ try {
+ Set<TableBucket> unavailableTableBucketSet =
+ flussAdmin
+ .createKvSnapshotLease(
+ kvSnapshotLeaseId,
kvSnapshotLeaseDurationMs)
+ .acquireSnapshots(bucketsToLease)
+ .get()
+ .getUnavailableTableBucketSet();
+ if (!unavailableTableBucketSet.isEmpty()) {
+ LOG.error(
+ "Failed to acquire kv snapshot lease for table
{}: {}.",
+ tablePath,
+ unavailableTableBucketSet);
+ }
+ } catch (Exception e) {
+ if (ExceptionUtils.findThrowable(e,
UnsupportedVersionException.class)
+ .isPresent()) {
+ LOG.warn(
+ "Failed to acquire kv snapshot lease for table
{} because the "
+ + "server does not support kv snapshot
lease API. "
+ + "Snapshots may be cleaned up earlier
than expected. "
+ + "Please upgrade the Fluss server to
version 0.9 or later.",
+ tablePath,
+ e);
+ } else {
+ throw e;
+ }
}
}
} catch (Exception e) {
@@ -1054,10 +1071,21 @@ public class FlinkSourceEnumerator
.releaseSnapshots(consumedKvSnapshots)
.get();
} catch (Exception e) {
- LOG.error("Failed to release kv snapshot lease. These snapshot
need to re-enqueue", e);
- // use the current checkpoint id to re-enqueue the buckets
- consumedKvSnapshots.forEach(
- tableBucket -> addConsumedBucket(checkpointId,
tableBucket));
+ if (ExceptionUtils.findThrowable(e,
UnsupportedVersionException.class).isPresent()) {
+ LOG.warn(
+ "Failed to release kv snapshot lease because the
server does not support "
+ + "kv snapshot lease API. Snapshots may remain
in storage longer "
+ + "than necessary. Please upgrade the Fluss
server to version 0.9 "
+ + "or later.",
+ e);
+ } else {
+ LOG.error(
+ "Failed to release kv snapshot lease. These snapshots
need to re-enqueue",
+ e);
+ // use the current checkpoint id to re-enqueue the buckets
+ consumedKvSnapshots.forEach(
+ tableBucket -> addConsumedBucket(checkpointId,
tableBucket));
+ }
}
}
@@ -1114,12 +1142,25 @@ public class FlinkSourceEnumerator
"Dropping kv snapshot lease {} when source enumerator
close. isStreaming {}",
leaseContext.getKvSnapshotLeaseId(),
streaming);
- flussAdmin
- .createKvSnapshotLease(
- leaseContext.getKvSnapshotLeaseId(),
- leaseContext.getKvSnapshotLeaseDurationMs())
- .dropLease()
- .get();
+ try {
+ flussAdmin
+ .createKvSnapshotLease(
+ leaseContext.getKvSnapshotLeaseId(),
+ leaseContext.getKvSnapshotLeaseDurationMs())
+ .dropLease()
+ .get();
+ } catch (Exception e) {
+ if (ExceptionUtils.findThrowable(e,
UnsupportedVersionException.class)
+ .isPresent()) {
+ LOG.warn(
+ "Failed to drop kv snapshot lease because the
server does not support "
+ + "kv snapshot lease API. Please upgrade
the Fluss server to "
+ + "version 0.9 or later.",
+ e);
+ } else {
+ throw e;
+ }
+ }
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorUnsupportedVersionTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorUnsupportedVersionTest.java
new file mode 100644
index 000000000..9732141e9
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorUnsupportedVersionTest.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source.enumerator;
+
+import org.apache.fluss.client.admin.KvSnapshotLease;
+import org.apache.fluss.client.initializer.OffsetsInitializer;
+import org.apache.fluss.client.metadata.AcquireKvSnapshotLeaseResult;
+import org.apache.fluss.client.metadata.KvSnapshots;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.UnsupportedVersionException;
+import org.apache.fluss.flink.sink.testutils.TestAdminAdapter;
+import org.apache.fluss.flink.source.reader.LeaseContext;
+import org.apache.fluss.flink.source.split.SourceSplitBase;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/** Tests for handling UnsupportedVersionException in FlinkSourceEnumerator. */
+class FlinkSourceEnumeratorUnsupportedVersionTest {
+
+ private static final TablePath TEST_TABLE_PATH =
+ TablePath.of("test_db", "test_table_unsupported_version");
+
+ /**
+ * Tests that getLatestKvSnapshotsAndRegister handles
UnsupportedVersionException gracefully
+ * when server doesn't support ACQUIRE_KV_SNAPSHOT_LEASE API.
+ */
+ @Test
+ void testGetLatestKvSnapshotsAndRegisterWithUnsupportedVersionException()
throws Exception {
+ try (MockSplitEnumeratorContext<SourceSplitBase> context =
+ new MockSplitEnumeratorContext<>(1)) {
+ FlinkSourceEnumerator enumerator = createTestEnumerator(context);
+
+ // Use an admin that returns valid snapshots but fails on
acquireSnapshots
+ setAdminField(enumerator, new
UnsupportedVersionAdminWithSnapshots());
+
+ // Call getLatestKvSnapshotsAndRegister via reflection
+ Method method =
+ FlinkSourceEnumerator.class.getDeclaredMethod(
+ "getLatestKvSnapshotsAndRegister", String.class);
+ method.setAccessible(true);
+
+ // Should not throw, just log warning and return valid snapshots
+ KvSnapshots result = (KvSnapshots) method.invoke(enumerator,
(String) null);
+ assertThat(result).isNotNull();
+ assertThat(result.getTableId()).isEqualTo(1L);
+ assertThat(result.getBucketIds()).containsExactly(0);
+ }
+ }
+
+ /**
+ * Tests that notifyCheckpointComplete handles UnsupportedVersionException
gracefully when
+ * server doesn't support RELEASE_KV_SNAPSHOT_LEASE API.
+ */
+ @Test
+ void testNotifyCheckpointCompleteWithUnsupportedVersionException() throws
Exception {
+ try (MockSplitEnumeratorContext<SourceSplitBase> context =
+ new MockSplitEnumeratorContext<>(1)) {
+ FlinkSourceEnumerator enumerator = createTestEnumerator(context);
+
+ // Set admin via reflection (avoid start() which needs cluster
connection)
+ setAdminField(enumerator, new UnsupportedVersionAdmin());
+
+ // Add a consumed bucket to trigger release
+ TableBucket bucket = new TableBucket(1L, 0);
+ enumerator.addConsumedBucket(1L, bucket);
+
+ // Should not throw exception, just log warning
+ assertThatCode(() -> enumerator.notifyCheckpointComplete(1L))
+ .doesNotThrowAnyException();
+ }
+ }
+
+ /**
+ * Tests that maybeDropKvSnapshotLease handles UnsupportedVersionException
gracefully when
+ * server doesn't support DROP_KV_SNAPSHOT_LEASE API.
+ */
+ @Test
+ void testMaybeDropKvSnapshotLeaseWithUnsupportedVersionException() throws
Exception {
+ try (MockSplitEnumeratorContext<SourceSplitBase> context =
+ new MockSplitEnumeratorContext<>(1)) {
+ FlinkSourceEnumerator enumerator =
+ new FlinkSourceEnumerator(
+ TEST_TABLE_PATH,
+ new Configuration(),
+ true,
+ false,
+ context,
+ OffsetsInitializer.full(),
+ 0L,
+ true,
+ null,
+ null,
+ LeaseContext.DEFAULT,
+ false);
+
+ // Set admin via reflection (avoid start() which needs cluster
connection)
+ setAdminField(enumerator, new UnsupportedVersionAdmin());
+
+ // Should not throw exception, just log warning
+ assertThatCode(enumerator::close).doesNotThrowAnyException();
+ }
+ }
+
+ /**
+ * Tests that other exceptions in releaseSnapshots still cause re-enqueue
of consumed buckets.
+ */
+ @Test
+ void testNotifyCheckpointCompleteWithOtherException() throws Exception {
+ try (MockSplitEnumeratorContext<SourceSplitBase> context =
+ new MockSplitEnumeratorContext<>(1)) {
+ FlinkSourceEnumerator enumerator = createTestEnumerator(context);
+
+ // Set admin via reflection (avoid start() which needs cluster
connection)
+ setAdminField(enumerator, new FailingAdmin(new
RuntimeException("Network error")));
+
+ // Add a consumed bucket to trigger release
+ TableBucket bucket = new TableBucket(1L, 0);
+ enumerator.addConsumedBucket(1L, bucket);
+
+ // Should not throw exception but should re-enqueue the bucket
+ assertThatCode(() -> enumerator.notifyCheckpointComplete(1L))
+ .doesNotThrowAnyException();
+
+ // Verify bucket was re-enqueued (checkpoint 1L should have the
bucket)
+ Set<TableBucket> reenqueuedBuckets =
enumerator.getAndRemoveConsumedBucketsUpTo(2L);
+ assertThat(reenqueuedBuckets).contains(bucket);
+ }
+ }
+
+ private FlinkSourceEnumerator createTestEnumerator(
+ MockSplitEnumeratorContext<SourceSplitBase> context) {
+ return new FlinkSourceEnumerator(
+ TEST_TABLE_PATH,
+ new Configuration(),
+ true,
+ false,
+ context,
+ OffsetsInitializer.full(),
+ 0L,
+ true,
+ null,
+ null,
+ LeaseContext.DEFAULT,
+ true);
+ }
+
+ private void setAdminField(FlinkSourceEnumerator enumerator,
TestAdminAdapter admin)
+ throws Exception {
+ Field adminField =
FlinkSourceEnumerator.class.getDeclaredField("flussAdmin");
+ adminField.setAccessible(true);
+ adminField.set(enumerator, admin);
+ }
+
+ //
-------------------------------------------------------------------------
+ // Test Admin implementations
+ //
-------------------------------------------------------------------------
+
+ /**
+ * A test Admin that returns a {@link KvSnapshotLease} whose lease
operations all fail with
+ * {@link UnsupportedVersionException}, simulating an old Fluss server
that does not support the
+ * kv snapshot lease API.
+ */
+ private static class UnsupportedVersionAdmin extends TestAdminAdapter {
+ @Override
+ public KvSnapshotLease createKvSnapshotLease(String leaseId, long
leaseDurationMs) {
+ return new UnsupportedVersionKvSnapshotLease(leaseId,
leaseDurationMs);
+ }
+ }
+
+ /**
+ * Extends {@link UnsupportedVersionAdmin} by additionally supporting
{@code
+ * getLatestKvSnapshots}, so that the {@code
getLatestKvSnapshotsAndRegister} code path can
+ * reach the {@code acquireSnapshots} call.
+ */
+ private static class UnsupportedVersionAdminWithSnapshots extends
UnsupportedVersionAdmin {
+ @Override
+ public CompletableFuture<KvSnapshots> getLatestKvSnapshots(TablePath
tablePath) {
+ Map<Integer, Long> snapshotIds = new HashMap<>();
+ snapshotIds.put(0, 1L);
+ Map<Integer, Long> logOffsets = new HashMap<>();
+ logOffsets.put(0, 0L);
+ return CompletableFuture.completedFuture(
+ new KvSnapshots(1L, null, snapshotIds, logOffsets));
+ }
+ }
+
+ /**
+ * A test Admin that returns a {@link KvSnapshotLease} whose lease
operations all fail with the
+ * given exception, used to verify non-{@link UnsupportedVersionException}
error handling.
+ */
+ private static class FailingAdmin extends TestAdminAdapter {
+ private final Exception cause;
+
+ FailingAdmin(Exception cause) {
+ this.cause = cause;
+ }
+
+ @Override
+ public KvSnapshotLease createKvSnapshotLease(String leaseId, long
leaseDurationMs) {
+ return new FailingKvSnapshotLease(leaseId, leaseDurationMs, cause);
+ }
+ }
+
+ /** A {@link KvSnapshotLease} that fails all operations with
UnsupportedVersionException. */
+ private static class UnsupportedVersionKvSnapshotLease implements
KvSnapshotLease {
+ private final String leaseId;
+ private final long leaseDurationMs;
+
+ UnsupportedVersionKvSnapshotLease(String leaseId, long
leaseDurationMs) {
+ this.leaseId = leaseId;
+ this.leaseDurationMs = leaseDurationMs;
+ }
+
+ @Override
+ public String leaseId() {
+ return leaseId;
+ }
+
+ @Override
+ public long leaseDurationMs() {
+ return leaseDurationMs;
+ }
+
+ @Override
+ public CompletableFuture<AcquireKvSnapshotLeaseResult>
acquireSnapshots(
+ Map<TableBucket, Long> snapshotIds) {
+ return failedFuture(
+ new UnsupportedVersionException(
+ "The server does not support
ACQUIRE_KV_SNAPSHOT_LEASE(1056)"));
+ }
+
+ @Override
+ public CompletableFuture<Void> renew() {
+ return failedFuture(
+ new UnsupportedVersionException(
+ "The server does not support
RENEW_KV_SNAPSHOT_LEASE"));
+ }
+
+ @Override
+ public CompletableFuture<Void> releaseSnapshots(Set<TableBucket>
bucketsToRelease) {
+ return failedFuture(
+ new UnsupportedVersionException(
+ "The server does not support
RELEASE_KV_SNAPSHOT_LEASE(1057)"));
+ }
+
+ @Override
+ public CompletableFuture<Void> dropLease() {
+ return failedFuture(
+ new UnsupportedVersionException(
+ "The server does not support
DROP_KV_SNAPSHOT_LEASE(1058)"));
+ }
+ }
+
+ /** A {@link KvSnapshotLease} that fails all operations with the given
exception. */
+ private static class FailingKvSnapshotLease implements KvSnapshotLease {
+ private final String leaseId;
+ private final long leaseDurationMs;
+ private final Exception cause;
+
+ FailingKvSnapshotLease(String leaseId, long leaseDurationMs, Exception
cause) {
+ this.leaseId = leaseId;
+ this.leaseDurationMs = leaseDurationMs;
+ this.cause = cause;
+ }
+
+ @Override
+ public String leaseId() {
+ return leaseId;
+ }
+
+ @Override
+ public long leaseDurationMs() {
+ return leaseDurationMs;
+ }
+
+ @Override
+ public CompletableFuture<AcquireKvSnapshotLeaseResult>
acquireSnapshots(
+ Map<TableBucket, Long> snapshotIds) {
+ return failedFuture(cause);
+ }
+
+ @Override
+ public CompletableFuture<Void> renew() {
+ return failedFuture(cause);
+ }
+
+ @Override
+ public CompletableFuture<Void> releaseSnapshots(Set<TableBucket>
bucketsToRelease) {
+ return failedFuture(cause);
+ }
+
+ @Override
+ public CompletableFuture<Void> dropLease() {
+ return failedFuture(cause);
+ }
+ }
+
+ private static <T> CompletableFuture<T> failedFuture(Throwable t) {
+ CompletableFuture<T> future = new CompletableFuture<>();
+ future.completeExceptionally(t);
+ return future;
+ }
+}