This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new c6d9fbb51 [flink] Drop kv snapshot lease when no checkpoint triggered 
before (#2610)
c6d9fbb51 is described below

commit c6d9fbb515b41fb0cf7259735367265875ba5dc2
Author: yunhong <[email protected]>
AuthorDate: Tue Feb 10 20:42:22 2026 +0800

    [flink] Drop kv snapshot lease when no checkpoint triggered before (#2610)
---
 .../org/apache/fluss/flink/source/FlinkSource.java |  6 +-
 .../source/enumerator/FlinkSourceEnumerator.java   | 78 +++++++++++++++++-----
 .../fluss/flink/source/FlinkTableSourceITCase.java | 36 +++++++---
 .../enumerator/FlinkSourceEnumeratorTest.java      | 27 +++++---
 4 files changed, 110 insertions(+), 37 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
index fba394edf..0aeb0d59c 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
@@ -150,7 +150,8 @@ public class FlinkSource<OUT>
                 streaming,
                 partitionFilters,
                 lakeSource,
-                leaseContext);
+                leaseContext,
+                false);
     }
 
     @Override
@@ -173,7 +174,8 @@ public class FlinkSource<OUT>
                 lakeSource,
                 new LeaseContext(
                         sourceEnumeratorState.getLeaseId(),
-                        leaseContext.getKvSnapshotLeaseDurationMs()));
+                        leaseContext.getKvSnapshotLeaseDurationMs()),
+                true);
     }
 
     @Override
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index 79ad2e1ab..0b1f549ab 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -31,6 +31,7 @@ import org.apache.fluss.config.Configuration;
 import org.apache.fluss.flink.lake.LakeSplitGenerator;
 import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
 import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
+import org.apache.fluss.flink.source.FlinkSource;
 import org.apache.fluss.flink.source.event.FinishedKvSnapshotConsumeEvent;
 import org.apache.fluss.flink.source.event.PartitionBucketsUnsubscribedEvent;
 import org.apache.fluss.flink.source.event.PartitionsRemovedEvent;
@@ -156,6 +157,27 @@ public class FlinkSourceEnumerator
 
     private volatile boolean closed = false;
 
+    /**
+     * Whether a checkpoint has been successfully completed before.
+     *
+     * <p>This flag is used in {@link #close()} to decide whether the kv 
snapshot lease should be
+     * dropped:
+     *
+     * <ul>
+     *   <li>If {@code false} (no checkpoint completed), the lease ID has not 
been persisted to
+     *       checkpoint state, so it is safe to drop the lease on close — no 
future restore will
+     *       reference it.
+     *   <li>If {@code true} (at least one checkpoint completed), the lease ID 
has been persisted
+     *       and may be restored via {@link FlinkSource#restoreEnumerator}. 
Dropping the lease on
+     *       close would invalidate the restored lease, so it must be kept.
+     * </ul>
+     *
+     * <p>This field is initialized to {@code true} when restoring from a 
checkpoint (i.e., {@code
+     * assignedTableBuckets} is non-empty), and set to {@code true} in {@link
+     * #notifyCheckpointComplete(long)} upon the first successful checkpoint.
+     */
+    private volatile boolean checkpointTriggeredBefore;
+
     @Nullable private final Predicate partitionFilters;
 
     @Nullable private final LakeSource<LakeSplit> lakeSource;
@@ -171,7 +193,8 @@ public class FlinkSourceEnumerator
             boolean streaming,
             @Nullable Predicate partitionFilters,
             @Nullable LakeSource<LakeSplit> lakeSource,
-            LeaseContext leaseContext) {
+            LeaseContext leaseContext,
+            boolean checkpointTriggeredBefore) {
         this(
                 tablePath,
                 flussConf,
@@ -186,7 +209,8 @@ public class FlinkSourceEnumerator
                 streaming,
                 partitionFilters,
                 lakeSource,
-                leaseContext);
+                leaseContext,
+                checkpointTriggeredBefore);
     }
 
     public FlinkSourceEnumerator(
@@ -203,7 +227,8 @@ public class FlinkSourceEnumerator
             boolean streaming,
             @Nullable Predicate partitionFilters,
             @Nullable LakeSource<LakeSplit> lakeSource,
-            LeaseContext leaseContext) {
+            LeaseContext leaseContext,
+            boolean checkpointTriggeredBefore) {
         this(
                 tablePath,
                 flussConf,
@@ -219,7 +244,8 @@ public class FlinkSourceEnumerator
                 partitionFilters,
                 lakeSource,
                 new WorkerExecutor(context),
-                leaseContext);
+                leaseContext,
+                checkpointTriggeredBefore);
     }
 
     FlinkSourceEnumerator(
@@ -237,7 +263,8 @@ public class FlinkSourceEnumerator
             @Nullable Predicate partitionFilters,
             @Nullable LakeSource<LakeSplit> lakeSource,
             WorkerExecutor workerExecutor,
-            LeaseContext leaseContext) {
+            LeaseContext leaseContext,
+            boolean checkpointTriggeredBefore) {
         this.tablePath = checkNotNull(tablePath);
         this.flussConf = checkNotNull(flussConf);
         this.hasPrimaryKey = hasPrimaryKey;
@@ -259,6 +286,7 @@ public class FlinkSourceEnumerator
         this.lakeSource = lakeSource;
         this.workerExecutor = workerExecutor;
         this.leaseContext = leaseContext;
+        this.checkpointTriggeredBefore = checkpointTriggeredBefore;
     }
 
     @Override
@@ -1007,6 +1035,8 @@ public class FlinkSourceEnumerator
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        checkpointTriggeredBefore = true;
+
         // lower than this checkpoint id.
         Set<TableBucket> consumedKvSnapshots = 
getAndRemoveConsumedBucketsUpTo(checkpointId);
 
@@ -1051,26 +1081,18 @@ public class FlinkSourceEnumerator
     @Override
     public void close() throws IOException {
         try {
-            closed = true;
+            maybeDropKvSnapshotLease();
 
-            if (!streaming
-                    && hasPrimaryKey
-                    && startingOffsetsInitializer instanceof 
SnapshotOffsetsInitializer) {
-                // drop the kv snapshot lease for the batch mode.
-                flussAdmin
-                        .createKvSnapshotLease(
-                                leaseContext.getKvSnapshotLeaseId(),
-                                leaseContext.getKvSnapshotLeaseDurationMs())
-                        .dropLease()
-                        .get();
-            }
+            closed = true;
 
             if (workerExecutor != null) {
                 workerExecutor.close();
             }
+
             if (flussAdmin != null) {
                 flussAdmin.close();
             }
+
             if (connection != null) {
                 connection.close();
             }
@@ -1079,6 +1101,28 @@ public class FlinkSourceEnumerator
         }
     }
 
+    private void maybeDropKvSnapshotLease() throws Exception {
+        if (flussAdmin != null
+                && hasPrimaryKey
+                && startingOffsetsInitializer instanceof 
SnapshotOffsetsInitializer
+                && !checkpointTriggeredBefore) {
+            // 1. Drop the kv snapshot lease for the batch mode.
+            // 2. For streaming mode, if no checkpoint was triggered, the 
lease ID
+            // has not been persisted to state. It won't be restored on 
restart,
+            // so it's safe to drop it now.
+            LOG.info(
+                    "Dropping kv snapshot lease {} when source enumerator 
close. isStreaming {}",
+                    leaseContext.getKvSnapshotLeaseId(),
+                    streaming);
+            flussAdmin
+                    .createKvSnapshotLease(
+                            leaseContext.getKvSnapshotLeaseId(),
+                            leaseContext.getKvSnapshotLeaseDurationMs())
+                    .dropLease()
+                    .get();
+        }
+    }
+
     // --------------- private class ---------------
     /** A container class to hold the newly added partitions and removed 
partitions. */
     private static class PartitionChange {
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
index 935190092..d13dcc8db 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
@@ -372,9 +372,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
 
         List<String> expectedRows = Arrays.asList("+I[1, v1]", "+I[2, v2]", 
"+I[3, v3]");
         org.apache.flink.util.CloseableIterator<Row> rowIter =
-                tEnv.executeSql(
-                                "select * from pk_table_with_kv_snapshot_lease 
/*+ OPTIONS('scan.kv.snapshot.lease.id' = 'test-consumer-1') */")
-                        .collect();
+                tEnv.executeSql("select * from 
pk_table_with_kv_snapshot_lease").collect();
         assertResultsIgnoreOrder(rowIter, expectedRows, false);
 
         // now, we put rows to the table again, should read the log
@@ -393,12 +391,32 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
         ZooKeeperClient zkClient = 
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
         retry(
                 Duration.ofMinutes(1),
-                () -> {
-                    
assertThat(zkClient.getKvSnapshotLeaseMetadata("test-consumer-1"))
-                            .isNotPresent();
-                    
assertThat(zkClient.getKvSnapshotLeasesList().contains("test-consumer-1"))
-                            .isFalse();
-                });
+                () -> 
assertThat(zkClient.getKvSnapshotLeasesList().isEmpty()).isTrue());
+    }
+
+    @Test
+    void testReadWithKvSnapshotLeaseNoCheckpoint() throws Exception {
+        tEnv.executeSql(
+                "create table pk_table_with_kv_snapshot_lease2 (a int not null 
primary key not enforced, b varchar)");
+        TablePath tablePath = TablePath.of(DEFAULT_DB, 
"pk_table_with_kv_snapshot_lease2");
+
+        List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3"));
+
+        // write records
+        writeRows(conn, tablePath, rows, false);
+
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
+
+        List<String> expectedRows = Arrays.asList("+I[1, v1]", "+I[2, v2]", 
"+I[3, v3]");
+        org.apache.flink.util.CloseableIterator<Row> rowIter =
+                tEnv.executeSql("select * from 
pk_table_with_kv_snapshot_lease2").collect();
+        assertResultsIgnoreOrder(rowIter, expectedRows, true);
+
+        // check lease will be dropped after job finished.
+        ZooKeeperClient zkClient = 
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
+        retry(
+                Duration.ofMinutes(1),
+                () -> 
assertThat(zkClient.getKvSnapshotLeasesList().isEmpty()).isTrue());
     }
 
     // 
-------------------------------------------------------------------------------------
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
index eb526e4e5..c89514243 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
@@ -115,7 +115,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                             streaming,
                             null,
                             null,
-                            LeaseContext.DEFAULT);
+                            LeaseContext.DEFAULT,
+                            false);
 
             enumerator.start();
 
@@ -164,7 +165,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                             streaming,
                             null,
                             null,
-                            LeaseContext.DEFAULT);
+                            LeaseContext.DEFAULT,
+                            false);
             enumerator.start();
             // register all read
             for (int i = 0; i < numSubtasks; i++) {
@@ -237,7 +239,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                             streaming,
                             null,
                             null,
-                            LeaseContext.DEFAULT);
+                            LeaseContext.DEFAULT,
+                            false);
 
             enumerator.start();
 
@@ -285,7 +288,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                             streaming,
                             null,
                             null,
-                            LeaseContext.DEFAULT);
+                            LeaseContext.DEFAULT,
+                            false);
 
             enumerator.start();
 
@@ -323,7 +327,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                             streaming,
                             null,
                             null,
-                            LeaseContext.DEFAULT);
+                            LeaseContext.DEFAULT,
+                            false);
 
             enumerator.start();
 
@@ -385,7 +390,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                             streaming,
                             null,
                             null,
-                            LeaseContext.DEFAULT);
+                            LeaseContext.DEFAULT,
+                            true);
 
             enumerator.start();
             assertThat(context.getSplitsAssignmentSequence()).isEmpty();
@@ -435,7 +441,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                                 null,
                                 null,
                                 workExecutor,
-                                LeaseContext.DEFAULT)) {
+                                LeaseContext.DEFAULT,
+                                false)) {
 
             Map<Long, String> partitionNameByIds =
                     waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH);
@@ -553,7 +560,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                                 streaming,
                                 null,
                                 null,
-                                LeaseContext.DEFAULT)) {
+                                LeaseContext.DEFAULT,
+                                false)) {
 
             // test splits for same non-partitioned bucket, should assign to 
same task
             TableBucket t1 = new TableBucket(tableId, 0);
@@ -667,7 +675,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                                 null,
                                 lakeSource,
                                 workExecutor,
-                                LeaseContext.DEFAULT)) {
+                                LeaseContext.DEFAULT,
+                                false)) {
             enumerator.start();
 
             // Remove the hybrid partition to mock expire after enumerator 
start

Reply via email to