This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
     new 4b9544e7a [Improve] flink job restore state from checkpoint bug fixed. 
#3749 (#3767)
4b9544e7a is described below

commit 4b9544e7a7ef60f528fa27c1240db50c3343f4e3
Author: benjobs <[email protected]>
AuthorDate: Tue Jun 18 18:46:07 2024 +0800

    [Improve] flink job restore state from checkpoint bug fixed. #3749 (#3767)
    
    Co-authored-by: benjobs <[email protected]>
---
 .../console/core/service/impl/ApplicationServiceImpl.java    |  5 +++++
 .../streampark/console/core/task/CheckpointProcessor.java    | 12 ++++++++++++
 2 files changed, 17 insertions(+)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 9fedb6083..33affd619 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1695,6 +1695,11 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
           // 3) success
           applicationLog.setSuccess(true);
 
+          // issue: https://github.com/apache/incubator-streampark/issues/3749
+          if (appParam.getSavePointed() == null || !appParam.getSavePointed()) 
{
+            checkpointProcessor.resetCheckpointNum(appParam.getId());
+          }
+
           if (response.flinkConfig() != null) {
             String jmMemory = 
response.flinkConfig().get(ConfigConst.KEY_FLINK_JM_PROCESS_MEMORY());
             if (jmMemory != null) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
index 96f4c2cfa..e08d37102 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
@@ -131,6 +131,18 @@ public class CheckpointProcessor {
     }
   }
 
+  // issue: https://github.com/apache/incubator-streampark/issues/3749
+  public void resetCheckpointNum(Long appId) {
+    checkPointCache
+        .asMap()
+        .forEach(
+            (k, v) -> {
+              if (k.startsWith(appId.toString())) {
+                checkPointCache.invalidate(k);
+              }
+            });
+  }
+
   private boolean checkSaveAsCheckpoint(@Nonnull CheckPoints.CheckPoint 
checkPoint, Long latestId) {
     return !checkPoint.getIsSavepoint() && (latestId == null || latestId < 
checkPoint.getId());
   }

Reply via email to