This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new bd2f754c4 [Fix-3153] Alert information cannot be sent in cancelling 
state (#3157)
bd2f754c4 is described below

commit bd2f754c4c0f80d03741a71761cb2159a0080ebf
Author: xiangzihao <[email protected]>
AuthorDate: Wed Oct 11 21:32:24 2023 -0500

    [Fix-3153] Alert information cannot be sent in cancelling state (#3157)
    
    * fix issue 3153
    
    * optimize alert logic
    
    * fix spotless check
---
 .../console/core/watcher/FlinkAppHttpWatcher.java  | 30 ++++++++++++++++------
 1 file changed, 22 insertions(+), 8 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
index 16bf1c0ae..5bea3a165 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
@@ -307,15 +307,29 @@ public class FlinkAppHttpWatcher {
 
     if (flag != null) {
       log.info("FlinkAppHttpWatcher previous state: canceling.");
-      if (stopFromEnum.isNone()) {
-        log.error(
-            "FlinkAppHttpWatcher query previous state was canceling and 
stopFrom NotFound,savePoint expired!");
-        savePointService.expire(application.getId());
+      FlinkAppStateEnum flinkAppStateEnum = FlinkAppStateEnum.CANCELED;
+      try {
+        YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
+        if (yarnAppInfo != null) {
+          String state = yarnAppInfo.getApp().getFinalStatus();
+          flinkAppStateEnum = FlinkAppStateEnum.of(state);
+        }
+      } finally {
+        if (stopFromEnum.isNone()) {
+          log.error(
+              "FlinkAppHttpWatcher query previous state was canceling and 
stopFrom NotFound,savePoint expired!");
+          savePointService.expire(application.getId());
+          if (flinkAppStateEnum == FlinkAppStateEnum.KILLED
+              || flinkAppStateEnum == FlinkAppStateEnum.FAILED) {
+            doAlert(application, flinkAppStateEnum);
+          }
+        }
+        application.setState(flinkAppStateEnum.getValue());
+        cleanSavepoint(application);
+        cleanOptioning(optionStateEnum, application.getId());
+        doPersistMetrics(application, true);
       }
-      application.setState(FlinkAppStateEnum.CANCELED.getValue());
-      cleanSavepoint(application);
-      cleanOptioning(optionStateEnum, application.getId());
-      doPersistMetrics(application, true);
+
     } else {
       // query the status from the yarn rest Api
       YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);

Reply via email to