This is an automated email from the ASF dual-hosted git repository.

yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 4e5441698 [hotfix][flink] Fix release kv snapshot lease for log table 
(#2719)
4e5441698 is described below

commit 4e54416986b31bd75900e5ba4cedd02e63003579
Author: Liebing <[email protected]>
AuthorDate: Thu Feb 26 12:17:28 2026 +0800

    [hotfix][flink] Fix release kv snapshot lease for log table (#2719)
---
 .../source/enumerator/FlinkSourceEnumerator.java   | 63 ++++++++++++----------
 1 file changed, 34 insertions(+), 29 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index e4e800c67..b537a8122 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -1054,37 +1054,42 @@ public class FlinkSourceEnumerator
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         checkpointTriggeredBefore = true;
 
-        // lower than this checkpoint id.
-        Set<TableBucket> consumedKvSnapshots = 
getAndRemoveConsumedBucketsUpTo(checkpointId);
+        if (hasPrimaryKey) {
+            // lower than this checkpoint id.
+            Set<TableBucket> consumedKvSnapshots = 
getAndRemoveConsumedBucketsUpTo(checkpointId);
 
-        LOG.info(
-                "kv snapshot has already consumed and try to release kv 
snapshot lease for: {}, checkpoint id: {}",
-                consumedKvSnapshots,
-                checkpointId);
+            if (!consumedKvSnapshots.isEmpty()) {
+                LOG.info(
+                        "kv snapshot has already consumed and try to release 
kv snapshot lease for: {}, checkpoint id: {}",
+                        consumedKvSnapshots,
+                        checkpointId);
 
-        // send request to fluss to unregister the kv snapshot lease.
-        try {
-            flussAdmin
-                    .createKvSnapshotLease(
-                            leaseContext.getKvSnapshotLeaseId(),
-                            leaseContext.getKvSnapshotLeaseDurationMs())
-                    .releaseSnapshots(consumedKvSnapshots)
-                    .get();
-        } catch (Exception e) {
-            if (ExceptionUtils.findThrowable(e, 
UnsupportedVersionException.class).isPresent()) {
-                LOG.warn(
-                        "Failed to release kv snapshot lease because the 
server does not support "
-                                + "kv snapshot lease API. Snapshots may remain 
in storage longer "
-                                + "than necessary. Please upgrade the Fluss 
server to version 0.9 "
-                                + "or later.",
-                        e);
-            } else {
-                LOG.error(
-                        "Failed to release kv snapshot lease. These snapshots 
need to re-enqueue",
-                        e);
-                // use the current checkpoint id to re-enqueue the buckets
-                consumedKvSnapshots.forEach(
-                        tableBucket -> addConsumedBucket(checkpointId, 
tableBucket));
+                // send request to fluss to unregister the kv snapshot lease.
+                try {
+                    flussAdmin
+                            .createKvSnapshotLease(
+                                    leaseContext.getKvSnapshotLeaseId(),
+                                    
leaseContext.getKvSnapshotLeaseDurationMs())
+                            .releaseSnapshots(consumedKvSnapshots)
+                            .get();
+                } catch (Exception e) {
+                    if (ExceptionUtils.findThrowable(e, 
UnsupportedVersionException.class)
+                            .isPresent()) {
+                        LOG.warn(
+                                "Failed to release kv snapshot lease because 
the server does not support "
+                                        + "kv snapshot lease API. Snapshots 
may remain in storage longer "
+                                        + "than necessary. Please upgrade the 
Fluss server to version 0.9 "
+                                        + "or later.",
+                                e);
+                    } else {
+                        LOG.error(
+                                "Failed to release kv snapshot lease. These 
snapshots need to re-enqueue",
+                                e);
+                        // use the current checkpoint id to re-enqueue the 
buckets
+                        consumedKvSnapshots.forEach(
+                                tableBucket -> addConsumedBucket(checkpointId, 
tableBucket));
+                    }
+                }
             }
         }
     }

Reply via email to