This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
new ffb23b983 [Fix-3153] Alert information cannot be sent in cancelling
state (#3236)
ffb23b983 is described below
commit ffb23b983b611446dcbf32feeab63ef84a3fd631
Author: xiangzihao <[email protected]>
AuthorDate: Thu Oct 12 08:49:54 2023 -0500
[Fix-3153] Alert information cannot be sent in cancelling state (#3236)
* fix issue 3153
* fix spotless error
---
.../console/core/task/FlinkRESTAPIWatcher.java | 29 ++++++++++++++++------
1 file changed, 21 insertions(+), 8 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
index 4ebbf75cd..b734a7a03 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java
@@ -501,15 +501,28 @@ public class FlinkRESTAPIWatcher {
Byte flag = CANCELING_CACHE.getIfPresent(application.getId());
if (flag != null) {
log.info("FlinkRESTAPIWatcher previous state: canceling.");
- if (StopFrom.NONE.equals(stopFrom)) {
- log.error(
- "FlinkRESTAPIWatcher query previous state was canceling and
stopFrom NotFound,savePoint expired!");
- savePointService.expire(application.getId());
+ FlinkAppState flinkAppState = FlinkAppState.CANCELED;
+ try {
+ YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
+ if (yarnAppInfo != null) {
+ String state = yarnAppInfo.getApp().getFinalStatus();
+ flinkAppState = FlinkAppState.of(state);
+ }
+ } finally {
+ if (StopFrom.NONE.equals(stopFrom)) {
+ log.error(
+ "FlinkRESTAPIWatcher query previous state was canceling and
stopFrom NotFound,savePoint expired!");
+ savePointService.expire(application.getId());
+ if (flinkAppState == FlinkAppState.KILLED || flinkAppState ==
FlinkAppState.FAILED) {
+ alertService.alert(application, flinkAppState);
+ }
+ }
+ application.setState(flinkAppState.getValue());
+ cleanSavepoint(application);
+ cleanOptioning(optionState, application.getId());
+ doPersistMetrics(application, true);
}
- application.setState(FlinkAppState.CANCELED.getValue());
- cleanSavepoint(application);
- cleanOptioning(optionState, application.getId());
- doPersistMetrics(application, true);
+
} else {
// query the status from the yarn rest Api
YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);