This is an automated email from the ASF dual-hosted git repository.

yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 575db6368 [hotfix][flink] Fix acquire kv snapshot lease cannot work 
issue for legacy clusters (#2714)
575db6368 is described below

commit 575db63687793be0086c63f6793bce683e1281f0
Author: yunhong <[email protected]>
AuthorDate: Wed Feb 25 16:39:36 2026 +0800

    [hotfix][flink] Fix acquire kv snapshot lease cannot work issue for legacy 
clusters (#2714)
---
 .../source/enumerator/FlinkSourceEnumerator.java   |  83 ++++--
 ...linkSourceEnumeratorUnsupportedVersionTest.java | 331 +++++++++++++++++++++
 2 files changed, 393 insertions(+), 21 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index 0b1f549ab..e4e800c67 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -28,6 +28,7 @@ import 
org.apache.fluss.client.initializer.SnapshotOffsetsInitializer;
 import org.apache.fluss.client.metadata.KvSnapshots;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.UnsupportedVersionException;
 import org.apache.fluss.flink.lake.LakeSplitGenerator;
 import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
 import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
@@ -607,17 +608,33 @@ public class FlinkSourceEnumerator
                         kvSnapshotLeaseId,
                         PhysicalTablePath.of(tablePath, partitionName));
                 long kvSnapshotLeaseDurationMs = 
leaseContext.getKvSnapshotLeaseDurationMs();
-                Set<TableBucket> unavailableTableBucketSet =
-                        flussAdmin
-                                .createKvSnapshotLease(kvSnapshotLeaseId, 
kvSnapshotLeaseDurationMs)
-                                .acquireSnapshots(bucketsToLease)
-                                .get()
-                                .getUnavailableTableBucketSet();
-                if (!unavailableTableBucketSet.isEmpty()) {
-                    LOG.error(
-                            "Failed to acquire kv snapshot lease for table {}: 
{}.",
-                            tablePath,
-                            unavailableTableBucketSet);
+                try {
+                    Set<TableBucket> unavailableTableBucketSet =
+                            flussAdmin
+                                    .createKvSnapshotLease(
+                                            kvSnapshotLeaseId, 
kvSnapshotLeaseDurationMs)
+                                    .acquireSnapshots(bucketsToLease)
+                                    .get()
+                                    .getUnavailableTableBucketSet();
+                    if (!unavailableTableBucketSet.isEmpty()) {
+                        LOG.error(
+                                "Failed to acquire kv snapshot lease for table 
{}: {}.",
+                                tablePath,
+                                unavailableTableBucketSet);
+                    }
+                } catch (Exception e) {
+                    if (ExceptionUtils.findThrowable(e, 
UnsupportedVersionException.class)
+                            .isPresent()) {
+                        LOG.warn(
+                                "Failed to acquire kv snapshot lease for table 
{} because the "
+                                        + "server does not support kv snapshot 
lease API. "
+                                        + "Snapshots may be cleaned up earlier 
than expected. "
+                                        + "Please upgrade the Fluss server to 
version 0.9 or later.",
+                                tablePath,
+                                e);
+                    } else {
+                        throw e;
+                    }
                 }
             }
         } catch (Exception e) {
@@ -1054,10 +1071,21 @@ public class FlinkSourceEnumerator
                     .releaseSnapshots(consumedKvSnapshots)
                     .get();
         } catch (Exception e) {
-            LOG.error("Failed to release kv snapshot lease. These snapshot 
need to re-enqueue", e);
-            // use the current checkpoint id to re-enqueue the buckets
-            consumedKvSnapshots.forEach(
-                    tableBucket -> addConsumedBucket(checkpointId, 
tableBucket));
+            if (ExceptionUtils.findThrowable(e, 
UnsupportedVersionException.class).isPresent()) {
+                LOG.warn(
+                        "Failed to release kv snapshot lease because the 
server does not support "
+                                + "kv snapshot lease API. Snapshots may remain 
in storage longer "
+                                + "than necessary. Please upgrade the Fluss 
server to version 0.9 "
+                                + "or later.",
+                        e);
+            } else {
+                LOG.error(
+                        "Failed to release kv snapshot lease. These snapshots 
need to re-enqueue",
+                        e);
+                // use the current checkpoint id to re-enqueue the buckets
+                consumedKvSnapshots.forEach(
+                        tableBucket -> addConsumedBucket(checkpointId, 
tableBucket));
+            }
         }
     }
 
@@ -1114,12 +1142,25 @@ public class FlinkSourceEnumerator
                     "Dropping kv snapshot lease {} when source enumerator 
close. isStreaming {}",
                     leaseContext.getKvSnapshotLeaseId(),
                     streaming);
-            flussAdmin
-                    .createKvSnapshotLease(
-                            leaseContext.getKvSnapshotLeaseId(),
-                            leaseContext.getKvSnapshotLeaseDurationMs())
-                    .dropLease()
-                    .get();
+            try {
+                flussAdmin
+                        .createKvSnapshotLease(
+                                leaseContext.getKvSnapshotLeaseId(),
+                                leaseContext.getKvSnapshotLeaseDurationMs())
+                        .dropLease()
+                        .get();
+            } catch (Exception e) {
+                if (ExceptionUtils.findThrowable(e, 
UnsupportedVersionException.class)
+                        .isPresent()) {
+                    LOG.warn(
+                            "Failed to drop kv snapshot lease because the 
server does not support "
+                                    + "kv snapshot lease API. Please upgrade 
the Fluss server to "
+                                    + "version 0.9 or later.",
+                            e);
+                } else {
+                    throw e;
+                }
+            }
         }
     }
 
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorUnsupportedVersionTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorUnsupportedVersionTest.java
new file mode 100644
index 000000000..9732141e9
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorUnsupportedVersionTest.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source.enumerator;
+
+import org.apache.fluss.client.admin.KvSnapshotLease;
+import org.apache.fluss.client.initializer.OffsetsInitializer;
+import org.apache.fluss.client.metadata.AcquireKvSnapshotLeaseResult;
+import org.apache.fluss.client.metadata.KvSnapshots;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.UnsupportedVersionException;
+import org.apache.fluss.flink.sink.testutils.TestAdminAdapter;
+import org.apache.fluss.flink.source.reader.LeaseContext;
+import org.apache.fluss.flink.source.split.SourceSplitBase;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/** Tests for handling UnsupportedVersionException in FlinkSourceEnumerator. */
+class FlinkSourceEnumeratorUnsupportedVersionTest {
+
+    private static final TablePath TEST_TABLE_PATH =
+            TablePath.of("test_db", "test_table_unsupported_version");
+
+    /**
+     * Tests that getLatestKvSnapshotsAndRegister handles 
UnsupportedVersionException gracefully
+     * when server doesn't support ACQUIRE_KV_SNAPSHOT_LEASE API.
+     */
+    @Test
+    void testGetLatestKvSnapshotsAndRegisterWithUnsupportedVersionException() 
throws Exception {
+        try (MockSplitEnumeratorContext<SourceSplitBase> context =
+                new MockSplitEnumeratorContext<>(1)) {
+            FlinkSourceEnumerator enumerator = createTestEnumerator(context);
+
+            // Use an admin that returns valid snapshots but fails on 
acquireSnapshots
+            setAdminField(enumerator, new 
UnsupportedVersionAdminWithSnapshots());
+
+            // Call getLatestKvSnapshotsAndRegister via reflection
+            Method method =
+                    FlinkSourceEnumerator.class.getDeclaredMethod(
+                            "getLatestKvSnapshotsAndRegister", String.class);
+            method.setAccessible(true);
+
+            // Should not throw, just log warning and return valid snapshots
+            KvSnapshots result = (KvSnapshots) method.invoke(enumerator, 
(String) null);
+            assertThat(result).isNotNull();
+            assertThat(result.getTableId()).isEqualTo(1L);
+            assertThat(result.getBucketIds()).containsExactly(0);
+        }
+    }
+
+    /**
+     * Tests that notifyCheckpointComplete handles UnsupportedVersionException 
gracefully when
+     * server doesn't support RELEASE_KV_SNAPSHOT_LEASE API.
+     */
+    @Test
+    void testNotifyCheckpointCompleteWithUnsupportedVersionException() throws 
Exception {
+        try (MockSplitEnumeratorContext<SourceSplitBase> context =
+                new MockSplitEnumeratorContext<>(1)) {
+            FlinkSourceEnumerator enumerator = createTestEnumerator(context);
+
+            // Set admin via reflection (avoid start() which needs cluster 
connection)
+            setAdminField(enumerator, new UnsupportedVersionAdmin());
+
+            // Add a consumed bucket to trigger release
+            TableBucket bucket = new TableBucket(1L, 0);
+            enumerator.addConsumedBucket(1L, bucket);
+
+            // Should not throw exception, just log warning
+            assertThatCode(() -> enumerator.notifyCheckpointComplete(1L))
+                    .doesNotThrowAnyException();
+        }
+    }
+
+    /**
+     * Tests that maybeDropKvSnapshotLease handles UnsupportedVersionException 
gracefully when
+     * server doesn't support DROP_KV_SNAPSHOT_LEASE API.
+     */
+    @Test
+    void testMaybeDropKvSnapshotLeaseWithUnsupportedVersionException() throws 
Exception {
+        try (MockSplitEnumeratorContext<SourceSplitBase> context =
+                new MockSplitEnumeratorContext<>(1)) {
+            FlinkSourceEnumerator enumerator =
+                    new FlinkSourceEnumerator(
+                            TEST_TABLE_PATH,
+                            new Configuration(),
+                            true,
+                            false,
+                            context,
+                            OffsetsInitializer.full(),
+                            0L,
+                            true,
+                            null,
+                            null,
+                            LeaseContext.DEFAULT,
+                            false);
+
+            // Set admin via reflection (avoid start() which needs cluster 
connection)
+            setAdminField(enumerator, new UnsupportedVersionAdmin());
+
+            // Should not throw exception, just log warning
+            assertThatCode(enumerator::close).doesNotThrowAnyException();
+        }
+    }
+
+    /**
+     * Tests that other exceptions in releaseSnapshots still cause re-enqueue 
of consumed buckets.
+     */
+    @Test
+    void testNotifyCheckpointCompleteWithOtherException() throws Exception {
+        try (MockSplitEnumeratorContext<SourceSplitBase> context =
+                new MockSplitEnumeratorContext<>(1)) {
+            FlinkSourceEnumerator enumerator = createTestEnumerator(context);
+
+            // Set admin via reflection (avoid start() which needs cluster 
connection)
+            setAdminField(enumerator, new FailingAdmin(new 
RuntimeException("Network error")));
+
+            // Add a consumed bucket to trigger release
+            TableBucket bucket = new TableBucket(1L, 0);
+            enumerator.addConsumedBucket(1L, bucket);
+
+            // Should not throw exception but should re-enqueue the bucket
+            assertThatCode(() -> enumerator.notifyCheckpointComplete(1L))
+                    .doesNotThrowAnyException();
+
+            // Verify bucket was re-enqueued (checkpoint 1L should have the 
bucket)
+            Set<TableBucket> reenqueuedBuckets = 
enumerator.getAndRemoveConsumedBucketsUpTo(2L);
+            assertThat(reenqueuedBuckets).contains(bucket);
+        }
+    }
+
+    private FlinkSourceEnumerator createTestEnumerator(
+            MockSplitEnumeratorContext<SourceSplitBase> context) {
+        return new FlinkSourceEnumerator(
+                TEST_TABLE_PATH,
+                new Configuration(),
+                true,
+                false,
+                context,
+                OffsetsInitializer.full(),
+                0L,
+                true,
+                null,
+                null,
+                LeaseContext.DEFAULT,
+                true);
+    }
+
+    private void setAdminField(FlinkSourceEnumerator enumerator, 
TestAdminAdapter admin)
+            throws Exception {
+        Field adminField = 
FlinkSourceEnumerator.class.getDeclaredField("flussAdmin");
+        adminField.setAccessible(true);
+        adminField.set(enumerator, admin);
+    }
+
+    // 
-------------------------------------------------------------------------
+    //  Test Admin implementations
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * A test Admin that returns a {@link KvSnapshotLease} whose lease 
operations all fail with
+     * {@link UnsupportedVersionException}, simulating an old Fluss server 
that does not support the
+     * kv snapshot lease API.
+     */
+    private static class UnsupportedVersionAdmin extends TestAdminAdapter {
+        @Override
+        public KvSnapshotLease createKvSnapshotLease(String leaseId, long 
leaseDurationMs) {
+            return new UnsupportedVersionKvSnapshotLease(leaseId, 
leaseDurationMs);
+        }
+    }
+
+    /**
+     * Extends {@link UnsupportedVersionAdmin} by additionally supporting 
{@code
+     * getLatestKvSnapshots}, so that the {@code 
getLatestKvSnapshotsAndRegister} code path can
+     * reach the {@code acquireSnapshots} call.
+     */
+    private static class UnsupportedVersionAdminWithSnapshots extends 
UnsupportedVersionAdmin {
+        @Override
+        public CompletableFuture<KvSnapshots> getLatestKvSnapshots(TablePath 
tablePath) {
+            Map<Integer, Long> snapshotIds = new HashMap<>();
+            snapshotIds.put(0, 1L);
+            Map<Integer, Long> logOffsets = new HashMap<>();
+            logOffsets.put(0, 0L);
+            return CompletableFuture.completedFuture(
+                    new KvSnapshots(1L, null, snapshotIds, logOffsets));
+        }
+    }
+
+    /**
+     * A test Admin that returns a {@link KvSnapshotLease} whose lease 
operations all fail with the
+     * given exception, used to verify non-{@link UnsupportedVersionException} 
error handling.
+     */
+    private static class FailingAdmin extends TestAdminAdapter {
+        private final Exception cause;
+
+        FailingAdmin(Exception cause) {
+            this.cause = cause;
+        }
+
+        @Override
+        public KvSnapshotLease createKvSnapshotLease(String leaseId, long 
leaseDurationMs) {
+            return new FailingKvSnapshotLease(leaseId, leaseDurationMs, cause);
+        }
+    }
+
+    /** A {@link KvSnapshotLease} that fails all operations with 
UnsupportedVersionException. */
+    private static class UnsupportedVersionKvSnapshotLease implements 
KvSnapshotLease {
+        private final String leaseId;
+        private final long leaseDurationMs;
+
+        UnsupportedVersionKvSnapshotLease(String leaseId, long 
leaseDurationMs) {
+            this.leaseId = leaseId;
+            this.leaseDurationMs = leaseDurationMs;
+        }
+
+        @Override
+        public String leaseId() {
+            return leaseId;
+        }
+
+        @Override
+        public long leaseDurationMs() {
+            return leaseDurationMs;
+        }
+
+        @Override
+        public CompletableFuture<AcquireKvSnapshotLeaseResult> 
acquireSnapshots(
+                Map<TableBucket, Long> snapshotIds) {
+            return failedFuture(
+                    new UnsupportedVersionException(
+                            "The server does not support 
ACQUIRE_KV_SNAPSHOT_LEASE(1056)"));
+        }
+
+        @Override
+        public CompletableFuture<Void> renew() {
+            return failedFuture(
+                    new UnsupportedVersionException(
+                            "The server does not support 
RENEW_KV_SNAPSHOT_LEASE"));
+        }
+
+        @Override
+        public CompletableFuture<Void> releaseSnapshots(Set<TableBucket> 
bucketsToRelease) {
+            return failedFuture(
+                    new UnsupportedVersionException(
+                            "The server does not support 
RELEASE_KV_SNAPSHOT_LEASE(1057)"));
+        }
+
+        @Override
+        public CompletableFuture<Void> dropLease() {
+            return failedFuture(
+                    new UnsupportedVersionException(
+                            "The server does not support 
DROP_KV_SNAPSHOT_LEASE(1058)"));
+        }
+    }
+
+    /** A {@link KvSnapshotLease} that fails all operations with the given 
exception. */
+    private static class FailingKvSnapshotLease implements KvSnapshotLease {
+        private final String leaseId;
+        private final long leaseDurationMs;
+        private final Exception cause;
+
+        FailingKvSnapshotLease(String leaseId, long leaseDurationMs, Exception 
cause) {
+            this.leaseId = leaseId;
+            this.leaseDurationMs = leaseDurationMs;
+            this.cause = cause;
+        }
+
+        @Override
+        public String leaseId() {
+            return leaseId;
+        }
+
+        @Override
+        public long leaseDurationMs() {
+            return leaseDurationMs;
+        }
+
+        @Override
+        public CompletableFuture<AcquireKvSnapshotLeaseResult> 
acquireSnapshots(
+                Map<TableBucket, Long> snapshotIds) {
+            return failedFuture(cause);
+        }
+
+        @Override
+        public CompletableFuture<Void> renew() {
+            return failedFuture(cause);
+        }
+
+        @Override
+        public CompletableFuture<Void> releaseSnapshots(Set<TableBucket> 
bucketsToRelease) {
+            return failedFuture(cause);
+        }
+
+        @Override
+        public CompletableFuture<Void> dropLease() {
+            return failedFuture(cause);
+        }
+    }
+
+    private static <T> CompletableFuture<T> failedFuture(Throwable t) {
+        CompletableFuture<T> future = new CompletableFuture<>();
+        future.completeExceptionally(t);
+        return future;
+    }
+}

Reply via email to