This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
     new ffb23b983 [Fix-3153] Alert information cannot be sent in cancelling 
state (#3236)
ffb23b983 is described below

commit ffb23b983b611446dcbf32feeab63ef84a3fd631
Author: xiangzihao <[email protected]>
AuthorDate: Thu Oct 12 08:49:54 2023 -0500

    [Fix-3153] Alert information cannot be sent in cancelling state (#3236)
    
    * fix issue 3153
    
    * fix spotless error
---
 .../console/core/task/FlinkRESTAPIWatcher.java     | 29 ++++++++++++++++------
 1 file changed, 21 insertions(+), 8 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
index 4ebbf75cd..b734a7a03 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
@@ -501,15 +501,28 @@ public class FlinkRESTAPIWatcher {
     Byte flag = CANCELING_CACHE.getIfPresent(application.getId());
     if (flag != null) {
       log.info("FlinkRESTAPIWatcher previous state: canceling.");
-      if (StopFrom.NONE.equals(stopFrom)) {
-        log.error(
-            "FlinkRESTAPIWatcher query previous state was canceling and 
stopFrom NotFound,savePoint expired!");
-        savePointService.expire(application.getId());
+      FlinkAppState flinkAppState = FlinkAppState.CANCELED;
+      try {
+        YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
+        if (yarnAppInfo != null) {
+          String state = yarnAppInfo.getApp().getFinalStatus();
+          flinkAppState = FlinkAppState.of(state);
+        }
+      } finally {
+        if (StopFrom.NONE.equals(stopFrom)) {
+          log.error(
+              "FlinkRESTAPIWatcher query previous state was canceling and 
stopFrom NotFound,savePoint expired!");
+          savePointService.expire(application.getId());
+          if (flinkAppState == FlinkAppState.KILLED || flinkAppState == 
FlinkAppState.FAILED) {
+            alertService.alert(application, flinkAppState);
+          }
+        }
+        application.setState(flinkAppState.getValue());
+        cleanSavepoint(application);
+        cleanOptioning(optionState, application.getId());
+        doPersistMetrics(application, true);
       }
-      application.setState(FlinkAppState.CANCELED.getValue());
-      cleanSavepoint(application);
-      cleanOptioning(optionState, application.getId());
-      doPersistMetrics(application, true);
+
     } else {
       // query the status from the yarn rest Api
       YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);

Reply via email to