This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new a4f64c248 [Improve] trigger savepoint bug fixed.
a4f64c248 is described below

commit a4f64c248158956dc48009d64305425cea79770e
Author: benjobs <[email protected]>
AuthorDate: Wed Mar 6 23:51:21 2024 +0800

    [Improve] trigger savepoint bug fixed.
---
 .../console/core/service/impl/SavePointServiceImpl.java    |  1 -
 .../streampark/console/core/task/FlinkAppHttpWatcher.java  | 14 +++++++++++++-
 2 files changed, 13 insertions(+), 2 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 67e828a13..687026e4a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -334,7 +334,6 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
             savepointResponse -> {
               if (savepointResponse != null && 
savepointResponse.savePointDir() != null) {
                 applicationLog.setSuccess(true);
-                flinkAppHttpWatcher.cleanSavepoint(application);
                 String savePointDir = savepointResponse.savePointDir();
                 log.info("Request savepoint successful, savepointDir: {}", 
savePointDir);
               }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index a5ee4c158..08a7f7902 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -449,6 +449,7 @@ public class FlinkAppHttpWatcher {
     } else {
       WATCHING_APPS.put(appId, application);
     }
+
     StateChangeEvent event = PREVIOUS_STATUS.getIfPresent(appId);
     StateChangeEvent nowEvent = StateChangeEvent.of(application);
     if (!nowEvent.equals(event)) {
@@ -617,8 +618,12 @@ public class FlinkAppHttpWatcher {
   }
 
   public void cleanSavepoint(Application application) {
-    SAVEPOINT_CACHE.invalidate(application.getId());
     application.setOptionState(OptionState.NONE.getValue());
+    StateChangeEvent event = PREVIOUS_STATUS.getIfPresent(application.getId());
+    if (event != null && event.getOptionState() == OptionState.SAVEPOINTING) {
+      doPersistMetrics(application, false);
+    }
+    SAVEPOINT_CACHE.invalidate(application.getId());
   }
 
   /** set current option state */
@@ -648,6 +653,13 @@ public class FlinkAppHttpWatcher {
     }
     log.info("[StreamPark][FlinkAppHttpWatcher] add app to 
savepoint,appId:{}", appId);
     SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE);
+
+    // update to PREVIOUS_STATUS
+    StateChangeEvent event = PREVIOUS_STATUS.getIfPresent(appId);
+    if (event != null) {
+      event.setOptionState(OptionState.SAVEPOINTING);
+      PREVIOUS_STATUS.put(appId, event);
+    }
   }
 
   public static void unWatching(Long appId) {

Reply via email to