This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 49c27a833 [Bug] CheckPoint Failure Options alert not work in specific 
situation (#4256)
49c27a833 is described below

commit 49c27a833d58d267e5f8819c86942e3be54f5a2b
Author: Gianzie <[email protected]>
AuthorDate: Sat Jun 21 22:18:42 2025 +0800

    [Bug] CheckPoint Failure Options alert not work in specific situation 
(#4256)
---
 .../console/core/component/FlinkCheckpointProcessor.java      | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
index 264bab8b7..b2419be39 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/component/FlinkCheckpointProcessor.java
@@ -111,14 +111,15 @@ public class FlinkCheckpointProcessor {
                                          FlinkApplication application, 
@Nonnull CheckPoints.CheckPoint checkPoint,
                                          Long appId) {
         Counter counter = checkPointFailedCache.get(appId);
-        if (counter == null) {
-            checkPointFailedCache.put(appId, new 
Counter(checkPoint.getTriggerTimestamp()));
+        Long ckTriggerTimestamp = checkPoint.getTriggerTimestamp();
+        Integer cpFailureRateInterval = application.getCpFailureRateInterval();
+        if (counter == null || counter.getDuration(ckTriggerTimestamp) >= 
cpFailureRateInterval) {
+            checkPointFailedCache.put(appId, new Counter(ckTriggerTimestamp));
             return;
         }
 
-        long minute = counter.getDuration(checkPoint.getTriggerTimestamp());
-        if (minute > application.getCpFailureRateInterval()
-            || counter.getCount() < application.getCpMaxFailureInterval()) {
+        long minute = counter.getDuration(ckTriggerTimestamp);
+        if (minute > cpFailureRateInterval || counter.getCount() < 
application.getCpMaxFailureInterval()) {
             counter.increment();
             return;
         }

Reply via email to