This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
     new b828b5059 [Bug] save checkpoint bug fixed. (#4002)
b828b5059 is described below

commit b828b5059807bb36b18885ea2e839c55a2a98a2b
Author: benjobs <[email protected]>
AuthorDate: Thu Aug 29 00:28:09 2024 +0800

    [Bug] save checkpoint bug fixed. (#4002)
    
    * [Bug] checkpoint chk-id bug fixed.
---
 .../core/service/impl/ApplicationServiceImpl.java  |  6 ---
 .../console/core/task/CheckpointProcessor.java     | 60 +++++++++++-----------
 2 files changed, 29 insertions(+), 37 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 1e1d5ae8f..8686a68d1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1702,12 +1702,6 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
           // 3) success
           applicationLog.setSuccess(true);
 
-          // issue: https://github.com/apache/incubator-streampark/issues/3749
-          if (appParam.getRestoreOrTriggerSavepoint() == null
-              || !appParam.getRestoreOrTriggerSavepoint()) {
-            checkpointProcessor.resetCheckpointNum(appParam.getId());
-          }
-
           if (response.flinkConfig() != null) {
             String jmMemory = 
response.flinkConfig().get(ConfigConst.KEY_FLINK_JM_PROCESS_MEMORY());
             if (jmMemory != null) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
index 2f6f5bb90..22451eb5c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
@@ -83,19 +83,29 @@ public class CheckpointProcessor {
     CheckPointKey checkPointKey = new CheckPointKey(appId, jobID, 
checkPoint.getId());
 
     if (CheckPointStatus.COMPLETED.equals(status)) {
-      if (checkSaveAsSavepoint(checkPointKey, checkPoint)) {
-        savepointedCache.put(checkPointKey.getSavePointId(), 
DEFAULT_FLAG_BYTE);
-        saveSavepoint(checkPoint, application.getId());
-        flinkAppHttpWatcher.cleanSavepoint(application);
-        return;
+      switch (checkPoint.getCheckPointType()) {
+        case SAVEPOINT:
+          if (checkSaveForSavepoint(checkPointKey, checkPoint)) {
+            savepointedCache.put(checkPointKey.getSavePointId(), 
DEFAULT_FLAG_BYTE);
+            saveSavepoint(checkPoint, application.getId());
+            flinkAppHttpWatcher.cleanSavepoint(application);
+            return;
+          }
+          break;
+        case CHECKPOINT:
+          Long latestChkId = getLatestCheckpointId(appId, 
checkPointKey.getCheckPointId());
+          if (checkSaveForCheckpoint(checkPoint, latestChkId)) {
+            checkPointCache.put(checkPointKey.getCheckPointId(), 
checkPoint.getId());
+            saveSavepoint(checkPoint, application.getId());
+          }
+          break;
+        default:
+          break;
       }
+      return;
+    }
 
-      Long latestChkId = getLatestCheckpointId(appId, 
checkPointKey.getCheckPointId());
-      if (checkSaveAsCheckpoint(checkPoint, latestChkId)) {
-        checkPointCache.put(checkPointKey.getCheckPointId(), 
checkPoint.getId());
-        saveSavepoint(checkPoint, application.getId());
-      }
-    } else if (shouldProcessFailedTrigger(checkPoint, 
application.cpFailedTrigger(), status)) {
+    if (shouldProcessFailedTrigger(checkPoint, application.cpFailedTrigger(), 
status)) {
       Counter counter = checkPointFailedCache.get(appId);
       if (counter == null) {
         checkPointFailedCache.put(appId, new 
Counter(checkPoint.getTriggerTimestamp()));
@@ -131,27 +141,15 @@ public class CheckpointProcessor {
     }
   }
 
-  // issue: https://github.com/apache/incubator-streampark/issues/3749
-  public void resetCheckpointNum(Long appId) {
-    checkPointCache
-        .asMap()
-        .forEach(
-            (k, v) -> {
-              if (k.startsWith(appId.toString())) {
-                checkPointCache.invalidate(k);
-              }
-            });
-  }
-
-  private boolean checkSaveAsCheckpoint(@Nonnull CheckPoints.CheckPoint 
checkPoint, Long latestId) {
-    return !checkPoint.getIsSavepoint() && (latestId == null || latestId < 
checkPoint.getId());
+  private boolean checkSaveForCheckpoint(
+      @Nonnull CheckPoints.CheckPoint checkPoint, Long latestId) {
+    return checkPoint.getId().equals(1L)
+        || latestId == null
+        || !latestId.equals(checkPoint.getId());
   }
 
-  private boolean checkSaveAsSavepoint(
+  private boolean checkSaveForSavepoint(
       CheckPointKey checkPointKey, @Nonnull CheckPoints.CheckPoint checkPoint) 
{
-    if (!checkPoint.getIsSavepoint()) {
-      return false;
-    }
     return savepointedCache.getIfPresent(checkPointKey.getSavePointId()) == 
null
         // If the savepoint triggered before SAVEPOINT_CACHE_HOUR span, we'll 
see it as out-of-time
         // savepoint and ignore it.
@@ -160,9 +158,9 @@ public class CheckpointProcessor {
   }
 
   @Nullable
-  private Long getLatestCheckpointId(Long appId, String cacheId) {
+  private Long getLatestCheckpointId(Long appId, String checkpointId) {
     return checkPointCache.get(
-        cacheId,
+        checkpointId,
         key -> {
           Savepoint savepoint = savepointService.getLatest(appId);
           return 
Optional.ofNullable(savepoint).map(Savepoint::getChkId).orElse(null);

Reply via email to