This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 45f169ab6 [Improve] Graceful stop flink job with savepoint (#3796)
45f169ab6 is described below

commit 45f169ab6627ada7a07aa8ce2f62aa31a7c2f7e5
Author: mzzx <[email protected]>
AuthorDate: Wed Jun 26 23:53:04 2024 +0800

    [Improve] Graceful stop flink job with savepoint (#3796)
    
    Co-authored-by: benjobs <[email protected]>
---
 .../core/service/application/impl/ApplicationInfoServiceImpl.java     | 3 ++-
 .../apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java   | 1 +
 .../org/apache/streampark/flink/client/trait/FlinkClientTrait.scala   | 4 ----
 3 files changed, 3 insertions(+), 5 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index 5bf8c2157..14c708f4f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -245,7 +245,8 @@ public class ApplicationInfoServiceImpl extends 
ServiceImpl<ApplicationMapper, A
   @Override
   public boolean checkAlter(Application appParam) {
     Long appId = appParam.getId();
-    if (FlinkAppStateEnum.CANCELED != appParam.getStateEnum()) {
+    if (FlinkAppStateEnum.CANCELED != appParam.getStateEnum()
+        && FlinkAppStateEnum.FINISHED != appParam.getStateEnum()) {
       return false;
     }
     long cancelUserId = FlinkAppHttpWatcher.getCanceledJobUserId(appId);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
index b0664d0be..d65d4f918 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
@@ -483,6 +483,7 @@ public class FlinkAppHttpWatcher {
         doPersistMetrics(application, false);
         break;
       case CANCELED:
+      case FINISHED:
         log.info(
             "[StreamPark][FlinkAppHttpWatcher] getFromFlinkRestApi, job state 
{}, stop tracking and delete stopFrom!",
             currentState.name());
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index d936980a9..d0a1bf14a 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -538,10 +538,6 @@ trait FlinkClientTrait extends Logger {
       case (false, false) =>
         client.cancel(jobID).get()
         null
-      case (true, false) =>
-        clientWrapper
-          .cancelWithSavepoint(jobID, savePointDir, cancelRequest.nativeFormat)
-          .get()
       case (_, _) =>
         clientWrapper
           .stopWithSavepoint(

Reply via email to