This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new c6d9fbb51 [flink] Drop kv snapshot lease when no checkpoint triggered
before (#2610)
c6d9fbb51 is described below
commit c6d9fbb515b41fb0cf7259735367265875ba5dc2
Author: yunhong <[email protected]>
AuthorDate: Tue Feb 10 20:42:22 2026 +0800
[flink] Drop kv snapshot lease when no checkpoint triggered before (#2610)
---
.../org/apache/fluss/flink/source/FlinkSource.java | 6 +-
.../source/enumerator/FlinkSourceEnumerator.java | 78 +++++++++++++++++-----
.../fluss/flink/source/FlinkTableSourceITCase.java | 36 +++++++---
.../enumerator/FlinkSourceEnumeratorTest.java | 27 +++++---
4 files changed, 110 insertions(+), 37 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
index fba394edf..0aeb0d59c 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
@@ -150,7 +150,8 @@ public class FlinkSource<OUT>
streaming,
partitionFilters,
lakeSource,
- leaseContext);
+ leaseContext,
+ false);
}
@Override
@@ -173,7 +174,8 @@ public class FlinkSource<OUT>
lakeSource,
new LeaseContext(
sourceEnumeratorState.getLeaseId(),
- leaseContext.getKvSnapshotLeaseDurationMs()));
+ leaseContext.getKvSnapshotLeaseDurationMs()),
+ true);
}
@Override
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index 79ad2e1ab..0b1f549ab 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -31,6 +31,7 @@ import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.lake.LakeSplitGenerator;
import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
+import org.apache.fluss.flink.source.FlinkSource;
import org.apache.fluss.flink.source.event.FinishedKvSnapshotConsumeEvent;
import org.apache.fluss.flink.source.event.PartitionBucketsUnsubscribedEvent;
import org.apache.fluss.flink.source.event.PartitionsRemovedEvent;
@@ -156,6 +157,27 @@ public class FlinkSourceEnumerator
private volatile boolean closed = false;
+ /**
+ * Whether a checkpoint has been successfully completed before.
+ *
+ * <p>This flag is used in {@link #close()} to decide whether the kv
snapshot lease should be
+ * dropped:
+ *
+ * <ul>
+ * <li>If {@code false} (no checkpoint completed), the lease ID has not
been persisted to
+ * checkpoint state, so it is safe to drop the lease on close — no
future restore will
+ * reference it.
+ * <li>If {@code true} (at least one checkpoint completed), the lease ID
has been persisted
+ * and may be restored via {@link FlinkSource#restoreEnumerator}.
Dropping the lease on
+ * close would invalidate the restored lease, so it must be kept.
+ * </ul>
+ *
+ * <p>This field is initialized to {@code true} when restoring from a
checkpoint (i.e., {@code
+ * assignedTableBuckets} is non-empty), and set to {@code true} in {@link
+ * #notifyCheckpointComplete(long)} upon the first successful checkpoint.
+ */
+ private volatile boolean checkpointTriggeredBefore;
+
@Nullable private final Predicate partitionFilters;
@Nullable private final LakeSource<LakeSplit> lakeSource;
@@ -171,7 +193,8 @@ public class FlinkSourceEnumerator
boolean streaming,
@Nullable Predicate partitionFilters,
@Nullable LakeSource<LakeSplit> lakeSource,
- LeaseContext leaseContext) {
+ LeaseContext leaseContext,
+ boolean checkpointTriggeredBefore) {
this(
tablePath,
flussConf,
@@ -186,7 +209,8 @@ public class FlinkSourceEnumerator
streaming,
partitionFilters,
lakeSource,
- leaseContext);
+ leaseContext,
+ checkpointTriggeredBefore);
}
public FlinkSourceEnumerator(
@@ -203,7 +227,8 @@ public class FlinkSourceEnumerator
boolean streaming,
@Nullable Predicate partitionFilters,
@Nullable LakeSource<LakeSplit> lakeSource,
- LeaseContext leaseContext) {
+ LeaseContext leaseContext,
+ boolean checkpointTriggeredBefore) {
this(
tablePath,
flussConf,
@@ -219,7 +244,8 @@ public class FlinkSourceEnumerator
partitionFilters,
lakeSource,
new WorkerExecutor(context),
- leaseContext);
+ leaseContext,
+ checkpointTriggeredBefore);
}
FlinkSourceEnumerator(
@@ -237,7 +263,8 @@ public class FlinkSourceEnumerator
@Nullable Predicate partitionFilters,
@Nullable LakeSource<LakeSplit> lakeSource,
WorkerExecutor workerExecutor,
- LeaseContext leaseContext) {
+ LeaseContext leaseContext,
+ boolean checkpointTriggeredBefore) {
this.tablePath = checkNotNull(tablePath);
this.flussConf = checkNotNull(flussConf);
this.hasPrimaryKey = hasPrimaryKey;
@@ -259,6 +286,7 @@ public class FlinkSourceEnumerator
this.lakeSource = lakeSource;
this.workerExecutor = workerExecutor;
this.leaseContext = leaseContext;
+ this.checkpointTriggeredBefore = checkpointTriggeredBefore;
}
@Override
@@ -1007,6 +1035,8 @@ public class FlinkSourceEnumerator
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ checkpointTriggeredBefore = true;
+
// lower than this checkpoint id.
Set<TableBucket> consumedKvSnapshots =
getAndRemoveConsumedBucketsUpTo(checkpointId);
@@ -1051,26 +1081,18 @@ public class FlinkSourceEnumerator
@Override
public void close() throws IOException {
try {
- closed = true;
+ maybeDropKvSnapshotLease();
- if (!streaming
- && hasPrimaryKey
- && startingOffsetsInitializer instanceof
SnapshotOffsetsInitializer) {
- // drop the kv snapshot lease for the batch mode.
- flussAdmin
- .createKvSnapshotLease(
- leaseContext.getKvSnapshotLeaseId(),
- leaseContext.getKvSnapshotLeaseDurationMs())
- .dropLease()
- .get();
- }
+ closed = true;
if (workerExecutor != null) {
workerExecutor.close();
}
+
if (flussAdmin != null) {
flussAdmin.close();
}
+
if (connection != null) {
connection.close();
}
@@ -1079,6 +1101,28 @@ public class FlinkSourceEnumerator
}
}
+ private void maybeDropKvSnapshotLease() throws Exception {
+ if (flussAdmin != null
+ && hasPrimaryKey
+ && startingOffsetsInitializer instanceof
SnapshotOffsetsInitializer
+ && !checkpointTriggeredBefore) {
+ // 1. Drop the kv snapshot lease for the batch mode.
+ // 2. For streaming mode, if no checkpoint was triggered, the
lease ID
+ // has not been persisted to state. It won't be restored on
restart,
+ // so it's safe to drop it now.
+ LOG.info(
+ "Dropping kv snapshot lease {} when source enumerator
close. isStreaming {}",
+ leaseContext.getKvSnapshotLeaseId(),
+ streaming);
+ flussAdmin
+ .createKvSnapshotLease(
+ leaseContext.getKvSnapshotLeaseId(),
+ leaseContext.getKvSnapshotLeaseDurationMs())
+ .dropLease()
+ .get();
+ }
+ }
+
// --------------- private class ---------------
/** A container class to hold the newly added partitions and removed
partitions. */
private static class PartitionChange {
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
index 935190092..d13dcc8db 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
@@ -372,9 +372,7 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
List<String> expectedRows = Arrays.asList("+I[1, v1]", "+I[2, v2]",
"+I[3, v3]");
org.apache.flink.util.CloseableIterator<Row> rowIter =
- tEnv.executeSql(
- "select * from pk_table_with_kv_snapshot_lease
/*+ OPTIONS('scan.kv.snapshot.lease.id' = 'test-consumer-1') */")
- .collect();
+ tEnv.executeSql("select * from
pk_table_with_kv_snapshot_lease").collect();
assertResultsIgnoreOrder(rowIter, expectedRows, false);
// now, we put rows to the table again, should read the log
@@ -393,12 +391,32 @@ abstract class FlinkTableSourceITCase extends
AbstractTestBase {
ZooKeeperClient zkClient =
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
retry(
Duration.ofMinutes(1),
- () -> {
-
assertThat(zkClient.getKvSnapshotLeaseMetadata("test-consumer-1"))
- .isNotPresent();
-
assertThat(zkClient.getKvSnapshotLeasesList().contains("test-consumer-1"))
- .isFalse();
- });
+ () ->
assertThat(zkClient.getKvSnapshotLeasesList().isEmpty()).isTrue());
+ }
+
+ @Test
+ void testReadWithKvSnapshotLeaseNoCheckpoint() throws Exception {
+ tEnv.executeSql(
+ "create table pk_table_with_kv_snapshot_lease2 (a int not null
primary key not enforced, b varchar)");
+ TablePath tablePath = TablePath.of(DEFAULT_DB,
"pk_table_with_kv_snapshot_lease2");
+
+ List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"),
row(3, "v3"));
+
+ // write records
+ writeRows(conn, tablePath, rows, false);
+
+ FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
+
+ List<String> expectedRows = Arrays.asList("+I[1, v1]", "+I[2, v2]",
"+I[3, v3]");
+ org.apache.flink.util.CloseableIterator<Row> rowIter =
+ tEnv.executeSql("select * from
pk_table_with_kv_snapshot_lease2").collect();
+ assertResultsIgnoreOrder(rowIter, expectedRows, true);
+
+ // check lease will be dropped after job finished.
+ ZooKeeperClient zkClient =
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
+ retry(
+ Duration.ofMinutes(1),
+ () ->
assertThat(zkClient.getKvSnapshotLeasesList().isEmpty()).isTrue());
}
//
-------------------------------------------------------------------------------------
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
index eb526e4e5..c89514243 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
@@ -115,7 +115,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
streaming,
null,
null,
- LeaseContext.DEFAULT);
+ LeaseContext.DEFAULT,
+ false);
enumerator.start();
@@ -164,7 +165,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
streaming,
null,
null,
- LeaseContext.DEFAULT);
+ LeaseContext.DEFAULT,
+ false);
enumerator.start();
// register all read
for (int i = 0; i < numSubtasks; i++) {
@@ -237,7 +239,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
streaming,
null,
null,
- LeaseContext.DEFAULT);
+ LeaseContext.DEFAULT,
+ false);
enumerator.start();
@@ -285,7 +288,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
streaming,
null,
null,
- LeaseContext.DEFAULT);
+ LeaseContext.DEFAULT,
+ false);
enumerator.start();
@@ -323,7 +327,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
streaming,
null,
null,
- LeaseContext.DEFAULT);
+ LeaseContext.DEFAULT,
+ false);
enumerator.start();
@@ -385,7 +390,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
streaming,
null,
null,
- LeaseContext.DEFAULT);
+ LeaseContext.DEFAULT,
+ true);
enumerator.start();
assertThat(context.getSplitsAssignmentSequence()).isEmpty();
@@ -435,7 +441,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
null,
null,
workExecutor,
- LeaseContext.DEFAULT)) {
+ LeaseContext.DEFAULT,
+ false)) {
Map<Long, String> partitionNameByIds =
waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH);
@@ -553,7 +560,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
streaming,
null,
null,
- LeaseContext.DEFAULT)) {
+ LeaseContext.DEFAULT,
+ false)) {
// test splits for same non-partitioned bucket, should assign to
same task
TableBucket t1 = new TableBucket(tableId, 0);
@@ -667,7 +675,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
null,
lakeSource,
workExecutor,
- LeaseContext.DEFAULT)) {
+ LeaseContext.DEFAULT,
+ false)) {
enumerator.start();
// Remove the hybrid partition to mock expire after enumerator
start