This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new 7ded9ae03 [Bug] trigger savepoint bug fixed
7ded9ae03 is described below

commit 7ded9ae03eb8d81e67ef01bf6a79d17625329814
Author: benjobs <[email protected]>
AuthorDate: Wed Mar 6 16:49:36 2024 +0800

    [Bug] trigger savepoint bug fixed
---
 .../console/core/service/impl/SavePointServiceImpl.java   | 15 ++-------------
 1 file changed, 2 insertions(+), 13 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index bf6ca3877..687026e4a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -99,7 +99,7 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
 
   private final ExecutorService flinkTriggerExecutor =
       new ThreadPoolExecutor(
-          1,
+          CPU_NUM,
           CPU_NUM,
           60L,
           TimeUnit.SECONDS,
@@ -335,18 +335,6 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
               if (savepointResponse != null && 
savepointResponse.savePointDir() != null) {
                 applicationLog.setSuccess(true);
                 String savePointDir = savepointResponse.savePointDir();
-
-                SavePoint savePoint = new SavePoint();
-                savePoint.setAppId(application.getId());
-                savePoint.setLatest(true);
-                savePoint.setType(CheckPointType.SAVEPOINT.get());
-
-                Date date = new Date();
-                savePoint.setCreateTime(date);
-                savePoint.setTriggerTime(date);
-                savePoint.setPath(savePointDir);
-                this.save(savePoint);
-
                 log.info("Request savepoint successful, savepointDir: {}", 
savePointDir);
               }
             },
@@ -364,6 +352,7 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
               application.setOptionState(OptionState.NONE.getValue());
               application.setOptionTime(new Date());
               applicationService.update(application);
+              flinkAppHttpWatcher.cleanSavepoint(application);
               flinkAppHttpWatcher.initialize();
             });
   }

Reply via email to