1996fanrui commented on code in PR #2268:
URL: 
https://github.com/apache/incubator-streampark/pull/2268#discussion_r1109821875


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/metrics/flink/CheckPoints.java:
##########
@@ -82,5 +96,20 @@ public String getPath() {
   @Data
   public static class Latest implements Serializable {
     private CheckPoint completed;
+    private CheckPoint savepoint;
+
+    @JsonIgnore
+    public List<CheckPoint> getLatestCheckpoint() {
+      List<CheckPoint> checkPoints = new ArrayList<>();
+      if (Objects.nonNull(completed)) {
+        checkPoints.add(completed);
+      }
+      if (Objects.nonNull(savepoint)) {
+        checkPoints.add(savepoint);
+      }
+      return checkPoints.stream()
+          .sorted(Comparator.comparingLong(chk -> chk.id))

Review Comment:
   The sort isn't necessary, right?
   
   If so, we can return the checkpoints directly.



##########
streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionSubmit.scala:
##########
@@ -212,4 +212,9 @@ object KubernetesNativeSessionSubmit extends 
KubernetesNativeSubmitTrait with Lo
       Utils.close(kubeClient)
     }
   }
+
+  override def doTriggerSavepoint(request: TriggerSavepointRequest, 
flinkConfig: Configuration): SavepointResponse = {
+    flinkConfig.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)

Review Comment:
   ```suggestion
       flinkConfig.safeSet(DeploymentOptions.TARGET, ExecutionMode. 
KUBERNETES_NATIVE_SESSION.getName)
   ```
   
   Is it a bug?



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java:
##########
@@ -54,6 +85,24 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
 
   @Autowired private ApplicationService applicationService;
 
+  @Autowired private ApplicationConfigService configService;
+
+  @Autowired private FlinkClusterService flinkClusterService;
+
+  @Autowired private ApplicationLogService applicationLogService;
+
+  @Autowired private FlinkRESTAPIWatcher flinkRESTAPIWatcher;
+
+  private final ExecutorService executorService =
+      new ThreadPoolExecutor(
+          Runtime.getRuntime().availableProcessors() * 5,
+          Runtime.getRuntime().availableProcessors() * 10,
+          60L,
+          TimeUnit.SECONDS,
+          new LinkedBlockingQueue<>(1024),
+          ThreadUtils.threadFactory("streampark-trigger-savepoint-executor"),

Review Comment:
   ```suggestion
             ThreadUtils.threadFactory("trigger-savepoint-executor"),
   ```
   
   `streampark` isn't needed due to all `thread pool` belong to StreamPark 😂



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java:
##########
@@ -52,30 +66,29 @@ public class CheckpointProcessor {
 
   @Autowired private SavePointService savePointService;
 
-  public void process(Long appId, CheckPoints checkPoints) {
-    CheckPoints.Latest latest = checkPoints.getLatest();
-    if (latest == null || latest.getCompleted() == null) {
-      return;
-    }
-    CheckPoints.CheckPoint checkPoint = latest.getCompleted();
+  public void process(Long appId, String jobID, @Nonnull CheckPoints 
checkPoints) {
+    checkPoints.getLatestCheckpoint().forEach(checkPoint -> process(appId, 
jobID, checkPoint));
+  }
+
+  private void process(Long appId, String jobID, @Nonnull 
CheckPoints.CheckPoint checkPoint) {
     Application application = applicationService.getById(appId);

Review Comment:
   This query can be removed, and we can pass some parameters from caller. It 
can reduce the pressure of DB.
   
   For example, jobID, application.cpFailedTrigger(), etc.



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java:
##########
@@ -52,30 +66,29 @@ public class CheckpointProcessor {
 
   @Autowired private SavePointService savePointService;
 
-  public void process(Long appId, CheckPoints checkPoints) {
-    CheckPoints.Latest latest = checkPoints.getLatest();
-    if (latest == null || latest.getCompleted() == null) {
-      return;
-    }
-    CheckPoints.CheckPoint checkPoint = latest.getCompleted();
+  public void process(Long appId, String jobID, @Nonnull CheckPoints 
checkPoints) {
+    checkPoints.getLatestCheckpoint().forEach(checkPoint -> process(appId, 
jobID, checkPoint));
+  }
+
+  private void process(Long appId, String jobID, @Nonnull 
CheckPoints.CheckPoint checkPoint) {
     Application application = applicationService.getById(appId);
     CheckPointStatus status = checkPoint.getCheckPointStatus();
 
     if (CheckPointStatus.COMPLETED.equals(status)) {
-      String cacheId = appId + "_" + application.getJobId();
-      Long latestId =
-          checkPointCache.get(
-              cacheId,
-              key -> {
-                SavePoint savePoint = savePointService.getLatest(appId);
-                return 
Optional.ofNullable(savePoint).map(SavePoint::getChkId).orElse(null);
-              });
-
-      if (latestId == null || latestId < checkPoint.getId()) {
-        saveSavepoint(checkPoint, application);
+      String cacheId = getCacheIdForCheckpoint(appId, application.getJobId());

Review Comment:
   ```suggestion
         String cacheId = getCacheIdForCheckpoint(appId, jobID);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to