This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 131685178 [Bug] Failed to mark the latest savepoint and checkpoint 
#1989 (#1991)
131685178 is described below

commit 131685178659b0c6fdc8b4263457e366f58e6116
Author: xujiangfeng001 <[email protected]>
AuthorDate: Wed Nov 9 15:44:30 2022 +0800

    [Bug] Failed to mark the latest savepoint and checkpoint #1989 (#1991)
    
    * [Bug] Failed to mark the latest savepoint and checkpoint #1989
---
 .../console/core/service/impl/ApplicationServiceImpl.java         | 5 +++--
 .../apache/streampark/console/core/task/FlinkTrackingTask.java    | 8 +++++---
 2 files changed, 8 insertions(+), 5 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 980a1bc92..6e86f0240 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -449,8 +449,9 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         if (!FlinkAppState.CANCELED.equals(state)) {
             return false;
         }
-        Long useId = FlinkTrackingTask.getCanceledJobUserId(appId);
-        return useId == null || application.getUserId().longValue() != 
FlinkTrackingTask.getCanceledJobUserId(appId).longValue();
+        long cancelUserId = 
FlinkTrackingTask.getCanceledJobUserId(appId).longValue();
+        long appUserId = application.getUserId().longValue();
+        return cancelUserId != -1 && cancelUserId != appUserId;
     }
 
     private void removeApp(Application application) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
index 64b406834..859a87e4f 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkTrackingTask.java
@@ -416,8 +416,10 @@ public class FlinkTrackingTask {
                 cleanSavepoint(application);
                 application.setState(currentState.getValue());
                 if (StopFrom.NONE.equals(stopFrom) || 
applicationService.checkAlter(application)) {
-                    log.info("flinkTrackingTask getFromFlinkRestApi, job 
cancel is not form StreamPark,savePoint obsoleted!");
-                    savePointService.obsolete(application.getId());
+                    if (StopFrom.NONE.equals(stopFrom)) {
+                        log.info("flinkTrackingTask getFromFlinkRestApi, job 
cancel is not form StreamPark,savePoint obsoleted!");
+                        savePointService.obsolete(application.getId());
+                    }
                     stopCanceledJob(application.getId());
                     alertService.alert(application, FlinkAppState.CANCELED);
                 }
@@ -662,7 +664,7 @@ public class FlinkTrackingTask {
     }
 
     public static Long getCanceledJobUserId(Long appId) {
-        return CANCELLED_JOB_MAP.get(appId);
+        return CANCELLED_JOB_MAP.get(appId) == null ? Long.valueOf(-1) : 
CANCELLED_JOB_MAP.get(appId);
     }
 
     public static Map<Long, Application> getAllTrackingApp() {

Reply via email to