This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new 03fffdaf2 [Improve] trigger savepoint improvements
03fffdaf2 is described below

commit 03fffdaf2be754c0a4f56070e394094a981c67df
Author: benjobs <[email protected]>
AuthorDate: Wed Mar 6 18:19:24 2024 +0800

    [Improve] trigger savepoint improvements
---
 .../console/core/entity/Application.java           |  1 +
 .../core/service/impl/SavePointServiceImpl.java    |  1 +
 .../console/core/task/FlinkAppHttpWatcher.java     | 24 +++++++++++++---------
 .../src/views/flink/app/hooks/useSavepoint.tsx     |  9 +++++---
 4 files changed, 22 insertions(+), 13 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index cda37c306..cb08078ef 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -120,6 +120,7 @@ public class Application implements Serializable {
   /** has restart count */
   private Integer restartCount;
 
+  @TableField(updateStrategy = FieldStrategy.IGNORED)
   private Integer optionState;
 
   /** alert id */
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 687026e4a..67e828a13 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -334,6 +334,7 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
             savepointResponse -> {
               if (savepointResponse != null && 
savepointResponse.savePointDir() != null) {
                 applicationLog.setSuccess(true);
+                flinkAppHttpWatcher.cleanSavepoint(application);
                 String savePointDir = savepointResponse.savePointDir();
                 log.info("Request savepoint successful, savepointDir: {}", 
savePointDir);
               }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index 226c3a561..a5ee4c158 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -403,15 +403,16 @@ public class FlinkAppHttpWatcher {
      NEED_RESTART_AFTER_ROLLBACK (Need to restart after rollback)
      NEED_RESTART_AFTER_DEPLOY (Need to rollback after deploy)
     */
+    Long appId = application.getId();
     if (OptionState.STARTING.equals(optionState)) {
-      Application latestApp = WATCHING_APPS.get(application.getId());
+      Application latestApp = WATCHING_APPS.get(appId);
       ReleaseState releaseState = latestApp.getReleaseState();
       switch (releaseState) {
         case NEED_RESTART:
         case NEED_ROLLBACK:
           LambdaUpdateWrapper<Application> updateWrapper =
               new LambdaUpdateWrapper<Application>()
-                  .eq(Application::getId, application.getId())
+                  .eq(Application::getId, appId)
                   .set(Application::getRelease, ReleaseState.DONE.get());
           applicationService.update(updateWrapper);
           break;
@@ -422,17 +423,18 @@ public class FlinkAppHttpWatcher {
 
     // The current state is running, and there is a current task in the 
savePointCache,
     // indicating that the task is doing savepoint
-    if (SAVEPOINT_CACHE.getIfPresent(application.getId()) != null) {
+    if (SAVEPOINT_CACHE.getIfPresent(appId) != null) {
       application.setOptionState(OptionState.SAVEPOINTING.getValue());
     } else {
       application.setOptionState(OptionState.NONE.getValue());
     }
     application.setState(currentState.getValue());
     doPersistMetrics(application, false);
-    cleanOptioning(optionState, application.getId());
+    cleanOptioning(optionState, appId);
   }
 
   private void doPersistMetrics(Application application, boolean stopWatch) {
+    Long appId = application.getId();
     if (FlinkAppState.isEndState(application.getState())) {
       application.setOverview(null);
       application.setTotalTM(null);
@@ -441,16 +443,16 @@ public class FlinkAppHttpWatcher {
       application.setAvailableSlot(null);
       application.setJmMemory(null);
       application.setTmMemory(null);
-      unWatching(application.getId());
+      unWatching(appId);
     } else if (stopWatch) {
-      unWatching(application.getId());
+      unWatching(appId);
     } else {
-      WATCHING_APPS.put(application.getId(), application);
+      WATCHING_APPS.put(appId, application);
     }
-    StateChangeEvent event = PREVIOUS_STATUS.getIfPresent(application.getId());
+    StateChangeEvent event = PREVIOUS_STATUS.getIfPresent(appId);
     StateChangeEvent nowEvent = StateChangeEvent.of(application);
     if (!nowEvent.equals(event)) {
-      PREVIOUS_STATUS.put(application.getId(), nowEvent);
+      PREVIOUS_STATUS.put(appId, nowEvent);
       applicationService.persistMetrics(application);
     }
   }
@@ -831,10 +833,12 @@ public class FlinkAppHttpWatcher {
         return false;
       }
       StateChangeEvent that = (StateChangeEvent) object;
+      if (optionState != that.optionState) {
+        return false;
+      }
       return Objects.equals(id, that.id)
           && Objects.equals(jobId, that.jobId)
           && appState == that.appState
-          && optionState == that.optionState
           && Objects.equals(jobManagerUrl, that.jobManagerUrl);
     }
 
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
index 106c33cdc..ea0ddb326 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useSavepoint.tsx
@@ -90,7 +90,7 @@ export const useSavepoint = (updateOption: Fn) => {
                 savePoint: unref(customSavepoint),
               });
               if (data.data === false) {
-                createErrorSwal('custom savepoint path is invalid, ' + 
data.message);
+                await createErrorSwal('custom savepoint path is invalid, ' + 
data.message);
                 reject('custom savepoint path is invalid');
               } else {
                 await handleSavepointAction(savepointReq);
@@ -104,8 +104,11 @@ export const useSavepoint = (updateOption: Fn) => {
               }
             } else {
               const { data } = await fetchCheckSavepointPath({ id: appId.value 
});
-              if (data.data) await handleSavepointAction(savepointReq);
-              else createErrorSwal(data.message);
+              if (data.data) {
+                await handleSavepointAction(savepointReq);
+              } else {
+                await createErrorSwal(data.message);
+              }
               reject();
             }
           } catch (error) {

Reply via email to