This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-0.9 in repository https://gitbox.apache.org/repos/asf/fluss.git
commit a65a6132b656c03318e31e23683fdcf95186a016 Author: yunhong <[email protected]> AuthorDate: Tue Feb 10 20:42:22 2026 +0800 [flink] Drop kv snapshot lease when no checkpoint triggered before (#2610) --- .../org/apache/fluss/flink/source/FlinkSource.java | 6 +- .../source/enumerator/FlinkSourceEnumerator.java | 78 +++++++++++++++++----- .../fluss/flink/source/FlinkTableSourceITCase.java | 36 +++++++--- .../enumerator/FlinkSourceEnumeratorTest.java | 27 +++++--- 4 files changed, 110 insertions(+), 37 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java index fba394edf..0aeb0d59c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java @@ -150,7 +150,8 @@ public class FlinkSource<OUT> streaming, partitionFilters, lakeSource, - leaseContext); + leaseContext, + false); } @Override @@ -173,7 +174,8 @@ public class FlinkSource<OUT> lakeSource, new LeaseContext( sourceEnumeratorState.getLeaseId(), - leaseContext.getKvSnapshotLeaseDurationMs())); + leaseContext.getKvSnapshotLeaseDurationMs()), + true); } @Override diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index 79ad2e1ab..0b1f549ab 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -31,6 +31,7 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.flink.lake.LakeSplitGenerator; import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; import org.apache.fluss.flink.lake.split.LakeSnapshotSplit; +import org.apache.fluss.flink.source.FlinkSource; import org.apache.fluss.flink.source.event.FinishedKvSnapshotConsumeEvent; import org.apache.fluss.flink.source.event.PartitionBucketsUnsubscribedEvent; import org.apache.fluss.flink.source.event.PartitionsRemovedEvent; @@ -156,6 +157,27 @@ public class FlinkSourceEnumerator private volatile boolean closed = false; + /** + * Whether a checkpoint has been successfully completed before. + * + * <p>This flag is used in {@link #close()} to decide whether the kv snapshot lease should be + * dropped: + * + * <ul> + * <li>If {@code false} (no checkpoint completed), the lease ID has not been persisted to + * checkpoint state, so it is safe to drop the lease on close — no future restore will + * reference it. + * <li>If {@code true} (at least one checkpoint completed), the lease ID has been persisted + * and may be restored via {@link FlinkSource#restoreEnumerator}. Dropping the lease on + * close would invalidate the restored lease, so it must be kept. + * </ul> + * + * <p>This field is initialized to {@code true} when restoring from a checkpoint (i.e., {@code + * assignedTableBuckets} is non-empty), and set to {@code true} in {@link + * #notifyCheckpointComplete(long)} upon the first successful checkpoint. + */ + private volatile boolean checkpointTriggeredBefore; + @Nullable private final Predicate partitionFilters; @Nullable private final LakeSource<LakeSplit> lakeSource; @@ -171,7 +193,8 @@ public class FlinkSourceEnumerator boolean streaming, @Nullable Predicate partitionFilters, @Nullable LakeSource<LakeSplit> lakeSource, - LeaseContext leaseContext) { + LeaseContext leaseContext, + boolean checkpointTriggeredBefore) { this( tablePath, flussConf, @@ -186,7 +209,8 @@ public class FlinkSourceEnumerator streaming, partitionFilters, lakeSource, - leaseContext); + leaseContext, + checkpointTriggeredBefore); } public FlinkSourceEnumerator( @@ -203,7 +227,8 @@ public class FlinkSourceEnumerator boolean streaming, @Nullable Predicate partitionFilters, @Nullable LakeSource<LakeSplit> lakeSource, - LeaseContext leaseContext) { + LeaseContext leaseContext, + boolean checkpointTriggeredBefore) { this( tablePath, flussConf, @@ -219,7 +244,8 @@ public class FlinkSourceEnumerator partitionFilters, lakeSource, new WorkerExecutor(context), - leaseContext); + leaseContext, + checkpointTriggeredBefore); } FlinkSourceEnumerator( @@ -237,7 +263,8 @@ public class FlinkSourceEnumerator @Nullable Predicate partitionFilters, @Nullable LakeSource<LakeSplit> lakeSource, WorkerExecutor workerExecutor, - LeaseContext leaseContext) { + LeaseContext leaseContext, + boolean checkpointTriggeredBefore) { this.tablePath = checkNotNull(tablePath); this.flussConf = checkNotNull(flussConf); this.hasPrimaryKey = hasPrimaryKey; @@ -259,6 +286,7 @@ public class FlinkSourceEnumerator this.lakeSource = lakeSource; this.workerExecutor = workerExecutor; this.leaseContext = leaseContext; + this.checkpointTriggeredBefore = checkpointTriggeredBefore; } @Override @@ -1007,6 +1035,8 @@ public class FlinkSourceEnumerator @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { + checkpointTriggeredBefore = true; + // lower than this checkpoint id. Set<TableBucket> consumedKvSnapshots = getAndRemoveConsumedBucketsUpTo(checkpointId); @@ -1051,26 +1081,18 @@ public class FlinkSourceEnumerator @Override public void close() throws IOException { try { - closed = true; + maybeDropKvSnapshotLease(); - if (!streaming - && hasPrimaryKey - && startingOffsetsInitializer instanceof SnapshotOffsetsInitializer) { - // drop the kv snapshot lease for the batch mode. - flussAdmin - .createKvSnapshotLease( - leaseContext.getKvSnapshotLeaseId(), - leaseContext.getKvSnapshotLeaseDurationMs()) - .dropLease() - .get(); - } + closed = true; if (workerExecutor != null) { workerExecutor.close(); } + if (flussAdmin != null) { flussAdmin.close(); } + if (connection != null) { connection.close(); } @@ -1079,6 +1101,28 @@ public class FlinkSourceEnumerator } } + private void maybeDropKvSnapshotLease() throws Exception { + if (flussAdmin != null + && hasPrimaryKey + && startingOffsetsInitializer instanceof SnapshotOffsetsInitializer + && !checkpointTriggeredBefore) { + // 1. Drop the kv snapshot lease for the batch mode. + // 2. For streaming mode, if no checkpoint was triggered, the lease ID + // has not been persisted to state. It won't be restored on restart, + // so it's safe to drop it now. + LOG.info( + "Dropping kv snapshot lease {} when source enumerator close. isStreaming {}", + leaseContext.getKvSnapshotLeaseId(), + streaming); + flussAdmin + .createKvSnapshotLease( + leaseContext.getKvSnapshotLeaseId(), + leaseContext.getKvSnapshotLeaseDurationMs()) + .dropLease() + .get(); + } + } + // --------------- private class --------------- /** A container class to hold the newly added partitions and removed partitions. */ private static class PartitionChange { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java index 935190092..d13dcc8db 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java @@ -372,9 +372,7 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { List<String> expectedRows = Arrays.asList("+I[1, v1]", "+I[2, v2]", "+I[3, v3]"); org.apache.flink.util.CloseableIterator<Row> rowIter = - tEnv.executeSql( - "select * from pk_table_with_kv_snapshot_lease /*+ OPTIONS('scan.kv.snapshot.lease.id' = 'test-consumer-1') */") - .collect(); + tEnv.executeSql("select * from pk_table_with_kv_snapshot_lease").collect(); assertResultsIgnoreOrder(rowIter, expectedRows, false); // now, we put rows to the table again, should read the log @@ -393,12 +391,32 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(); retry( Duration.ofMinutes(1), - () -> { - assertThat(zkClient.getKvSnapshotLeaseMetadata("test-consumer-1")) - .isNotPresent(); - assertThat(zkClient.getKvSnapshotLeasesList().contains("test-consumer-1")) - .isFalse(); - }); + () -> assertThat(zkClient.getKvSnapshotLeasesList().isEmpty()).isTrue()); + } + + @Test + void testReadWithKvSnapshotLeaseNoCheckpoint() throws Exception { + tEnv.executeSql( + "create table pk_table_with_kv_snapshot_lease2 (a int not null primary key not enforced, b varchar)"); + TablePath tablePath = TablePath.of(DEFAULT_DB, "pk_table_with_kv_snapshot_lease2"); + + List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3")); + + // write records + writeRows(conn, tablePath, rows, false); + + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath); + + List<String> expectedRows = Arrays.asList("+I[1, v1]", "+I[2, v2]", "+I[3, v3]"); + org.apache.flink.util.CloseableIterator<Row> rowIter = + tEnv.executeSql("select * from pk_table_with_kv_snapshot_lease2").collect(); + assertResultsIgnoreOrder(rowIter, expectedRows, true); + + // check lease will be dropped after job finished. + ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(); + retry( + Duration.ofMinutes(1), + () -> assertThat(zkClient.getKvSnapshotLeasesList().isEmpty()).isTrue()); } // ------------------------------------------------------------------------------------- diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java index eb526e4e5..c89514243 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java @@ -115,7 +115,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase { streaming, null, null, - LeaseContext.DEFAULT); + LeaseContext.DEFAULT, + false); enumerator.start(); @@ -164,7 +165,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase { streaming, null, null, - LeaseContext.DEFAULT); + LeaseContext.DEFAULT, + false); enumerator.start(); // register all read for (int i = 0; i < numSubtasks; i++) { @@ -237,7 +239,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase { streaming, null, null, - LeaseContext.DEFAULT); + LeaseContext.DEFAULT, + false); enumerator.start(); @@ -285,7 +288,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase { streaming, null, null, - LeaseContext.DEFAULT); + LeaseContext.DEFAULT, + false); enumerator.start(); @@ -323,7 +327,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase { streaming, null, null, - LeaseContext.DEFAULT); + LeaseContext.DEFAULT, + false); enumerator.start(); @@ -385,7 +390,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase { streaming, null, null, - LeaseContext.DEFAULT); + LeaseContext.DEFAULT, + true); enumerator.start(); assertThat(context.getSplitsAssignmentSequence()).isEmpty(); @@ -435,7 +441,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase { null, null, workExecutor, - LeaseContext.DEFAULT)) { + LeaseContext.DEFAULT, + false)) { Map<Long, String> partitionNameByIds = waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH); @@ -553,7 +560,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase { streaming, null, null, - LeaseContext.DEFAULT)) { + LeaseContext.DEFAULT, + false)) { // test splits for same non-partitioned bucket, should assign to same task TableBucket t1 = new TableBucket(tableId, 0); @@ -667,7 +675,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase { null, lakeSource, workExecutor, - LeaseContext.DEFAULT)) { + LeaseContext.DEFAULT, + false)) { enumerator.start(); // Remove the hybrid partition to mock expire after enumerator start
