This is an automated email from the ASF dual-hosted git repository.
yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 4e5441698 [hotfix][flink] Fix release kv snapshot lease for log table
(#2719)
4e5441698 is described below
commit 4e54416986b31bd75900e5ba4cedd02e63003579
Author: Liebing <[email protected]>
AuthorDate: Thu Feb 26 12:17:28 2026 +0800
[hotfix][flink] Fix release kv snapshot lease for log table (#2719)
---
.../source/enumerator/FlinkSourceEnumerator.java | 63 ++++++++++++----------
1 file changed, 34 insertions(+), 29 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index e4e800c67..b537a8122 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -1054,37 +1054,42 @@ public class FlinkSourceEnumerator
public void notifyCheckpointComplete(long checkpointId) throws Exception {
checkpointTriggeredBefore = true;
- // lower than this checkpoint id.
- Set<TableBucket> consumedKvSnapshots =
getAndRemoveConsumedBucketsUpTo(checkpointId);
+ if (hasPrimaryKey) {
+ // lower than this checkpoint id.
+ Set<TableBucket> consumedKvSnapshots =
getAndRemoveConsumedBucketsUpTo(checkpointId);
- LOG.info(
- "kv snapshot has already consumed and try to release kv
snapshot lease for: {}, checkpoint id: {}",
- consumedKvSnapshots,
- checkpointId);
+ if (!consumedKvSnapshots.isEmpty()) {
+ LOG.info(
+ "kv snapshot has already consumed and try to release
kv snapshot lease for: {}, checkpoint id: {}",
+ consumedKvSnapshots,
+ checkpointId);
- // send request to fluss to unregister the kv snapshot lease.
- try {
- flussAdmin
- .createKvSnapshotLease(
- leaseContext.getKvSnapshotLeaseId(),
- leaseContext.getKvSnapshotLeaseDurationMs())
- .releaseSnapshots(consumedKvSnapshots)
- .get();
- } catch (Exception e) {
- if (ExceptionUtils.findThrowable(e,
UnsupportedVersionException.class).isPresent()) {
- LOG.warn(
- "Failed to release kv snapshot lease because the
server does not support "
- + "kv snapshot lease API. Snapshots may remain
in storage longer "
- + "than necessary. Please upgrade the Fluss
server to version 0.9 "
- + "or later.",
- e);
- } else {
- LOG.error(
- "Failed to release kv snapshot lease. These snapshots
need to re-enqueue",
- e);
- // use the current checkpoint id to re-enqueue the buckets
- consumedKvSnapshots.forEach(
- tableBucket -> addConsumedBucket(checkpointId,
tableBucket));
+ // send request to fluss to unregister the kv snapshot lease.
+ try {
+ flussAdmin
+ .createKvSnapshotLease(
+ leaseContext.getKvSnapshotLeaseId(),
+
leaseContext.getKvSnapshotLeaseDurationMs())
+ .releaseSnapshots(consumedKvSnapshots)
+ .get();
+ } catch (Exception e) {
+ if (ExceptionUtils.findThrowable(e,
UnsupportedVersionException.class)
+ .isPresent()) {
+ LOG.warn(
+ "Failed to release kv snapshot lease because
the server does not support "
+ + "kv snapshot lease API. Snapshots
may remain in storage longer "
+ + "than necessary. Please upgrade the
Fluss server to version 0.9 "
+ + "or later.",
+ e);
+ } else {
+ LOG.error(
+ "Failed to release kv snapshot lease. These
snapshots need to re-enqueue",
+ e);
+ // use the current checkpoint id to re-enqueue the
buckets
+ consumedKvSnapshots.forEach(
+ tableBucket -> addConsumedBucket(checkpointId,
tableBucket));
+ }
+ }
}
}
}