This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new bd2f754c4 [Fix-3153] Alert information cannot be sent in cancelling
state (#3157)
bd2f754c4 is described below
commit bd2f754c4c0f80d03741a71761cb2159a0080ebf
Author: xiangzihao <[email protected]>
AuthorDate: Wed Oct 11 21:32:24 2023 -0500
[Fix-3153] Alert information cannot be sent in cancelling state (#3157)
* fix issue 3153
* optimize alert logic
* fix spotless check
---
.../console/core/watcher/FlinkAppHttpWatcher.java | 30 ++++++++++++++++------
1 file changed, 22 insertions(+), 8 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
index 16bf1c0ae..5bea3a165 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java
@@ -307,15 +307,29 @@ public class FlinkAppHttpWatcher {
if (flag != null) {
log.info("FlinkAppHttpWatcher previous state: canceling.");
- if (stopFromEnum.isNone()) {
- log.error(
- "FlinkAppHttpWatcher query previous state was canceling and
stopFrom NotFound,savePoint expired!");
- savePointService.expire(application.getId());
+ FlinkAppStateEnum flinkAppStateEnum = FlinkAppStateEnum.CANCELED;
+ try {
+ YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
+ if (yarnAppInfo != null) {
+ String state = yarnAppInfo.getApp().getFinalStatus();
+ flinkAppStateEnum = FlinkAppStateEnum.of(state);
+ }
+ } finally {
+ if (stopFromEnum.isNone()) {
+ log.error(
+ "FlinkAppHttpWatcher query previous state was canceling and
stopFrom NotFound,savePoint expired!");
+ savePointService.expire(application.getId());
+ if (flinkAppStateEnum == FlinkAppStateEnum.KILLED
+ || flinkAppStateEnum == FlinkAppStateEnum.FAILED) {
+ doAlert(application, flinkAppStateEnum);
+ }
+ }
+ application.setState(flinkAppStateEnum.getValue());
+ cleanSavepoint(application);
+ cleanOptioning(optionStateEnum, application.getId());
+ doPersistMetrics(application, true);
}
- application.setState(FlinkAppStateEnum.CANCELED.getValue());
- cleanSavepoint(application);
- cleanOptioning(optionStateEnum, application.getId());
- doPersistMetrics(application, true);
+
} else {
// query the status from the yarn rest Api
YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);