This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 131685178 [Bug] Failed to mark the latest savepoint and checkpoint
#1989 (#1991)
131685178 is described below
commit 131685178659b0c6fdc8b4263457e366f58e6116
Author: xujiangfeng001 <[email protected]>
AuthorDate: Wed Nov 9 15:44:30 2022 +0800
[Bug] Failed to mark the latest savepoint and checkpoint #1989 (#1991)
* [Bug] Failed to mark the latest savepoint and checkpoint #1989
---
.../console/core/service/impl/ApplicationServiceImpl.java | 5 +++--
.../apache/streampark/console/core/task/FlinkTrackingTask.java | 8 +++++---
2 files changed, 8 insertions(+), 5 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 980a1bc92..6e86f0240 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -449,8 +449,9 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
if (!FlinkAppState.CANCELED.equals(state)) {
return false;
}
- Long useId = FlinkTrackingTask.getCanceledJobUserId(appId);
- return useId == null || application.getUserId().longValue() !=
FlinkTrackingTask.getCanceledJobUserId(appId).longValue();
+ long cancelUserId =
FlinkTrackingTask.getCanceledJobUserId(appId).longValue();
+ long appUserId = application.getUserId().longValue();
+ return cancelUserId != -1 && cancelUserId != appUserId;
}
private void removeApp(Application application) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
index 64b406834..859a87e4f 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
@@ -416,8 +416,10 @@ public class FlinkTrackingTask {
cleanSavepoint(application);
application.setState(currentState.getValue());
if (StopFrom.NONE.equals(stopFrom) ||
applicationService.checkAlter(application)) {
- log.info("flinkTrackingTask getFromFlinkRestApi, job
cancel is not form StreamPark,savePoint obsoleted!");
- savePointService.obsolete(application.getId());
+ if (StopFrom.NONE.equals(stopFrom)) {
+ log.info("flinkTrackingTask getFromFlinkRestApi, job
cancel is not form StreamPark,savePoint obsoleted!");
+ savePointService.obsolete(application.getId());
+ }
stopCanceledJob(application.getId());
alertService.alert(application, FlinkAppState.CANCELED);
}
@@ -662,7 +664,7 @@ public class FlinkTrackingTask {
}
public static Long getCanceledJobUserId(Long appId) {
- return CANCELLED_JOB_MAP.get(appId);
+ return CANCELLED_JOB_MAP.get(appId) == null ? Long.valueOf(-1) :
CANCELLED_JOB_MAP.get(appId);
}
public static Map<Long, Application> getAllTrackingApp() {