This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6993ff41f [Issue-2534][Improve] Optimize methods for Savepoint 
service. (#2592)
6993ff41f is described below

commit 6993ff41f0ac7ce0a0c272d6bc0ba3496a6bc8dd
Author: Roc Marshal <[email protected]>
AuthorDate: Thu May 11 23:02:05 2023 +0800

    [Issue-2534][Improve] Optimize methods for Savepoint service. (#2592)
    
    * [Issue-2534][Improve] Optimize methods for Savepoint service.
    
    * Add License header
    
    * Updated based on review comments
---
 .../core/service/impl/SavePointServiceImpl.java    | 411 ++++++++++++---------
 .../console/core/service/SavePointServiceTest.java | 182 +++++++++
 2 files changed, 414 insertions(+), 179 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 843a34850..22be1d5e1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -19,7 +19,6 @@ package org.apache.streampark.console.core.service.impl;
 
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.util.CompletableFutureUtils;
-import org.apache.streampark.common.util.PropertiesUtils;
 import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.domain.Constant;
@@ -50,14 +49,16 @@ import 
org.apache.streampark.flink.client.bean.SavepointResponse;
 import org.apache.streampark.flink.client.bean.TriggerSavepointRequest;
 import org.apache.streampark.flink.util.FlinkUtils;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.RestOptions;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -71,6 +72,7 @@ import java.net.URI;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -78,6 +80,11 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import static 
org.apache.flink.configuration.CheckpointingOptions.MAX_RETAINED_CHECKPOINTS;
+import static 
org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY;
+import static 
org.apache.streampark.common.util.PropertiesUtils.extractDynamicPropertiesAsJava;
+import static 
org.apache.streampark.console.core.enums.CheckPointType.CHECKPOINT;
+
 @Slf4j
 @Service
 @Transactional(propagation = Propagation.SUPPORTS, readOnly = true, 
rollbackFor = Exception.class)
@@ -122,98 +129,6 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
     return super.save(entity);
   }
 
-  private void expire(SavePoint entity) {
-    FlinkEnv flinkEnv = flinkEnvService.getByAppId(entity.getAppId());
-    Application application = applicationService.getById(entity.getAppId());
-    Utils.notNull(flinkEnv);
-    Utils.notNull(application);
-
-    String numRetainedKey = 
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key();
-    String numRetainedFromDynamicProp =
-        
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties())
-            .get(numRetainedKey);
-
-    int cpThreshold = 0;
-    if (numRetainedFromDynamicProp != null) {
-      try {
-        int value = Integer.parseInt(numRetainedFromDynamicProp.trim());
-        if (value > 0) {
-          cpThreshold = value;
-        } else {
-          log.warn(
-              "this value of dynamicProperties key: 
state.checkpoints.num-retained is invalid, must be gt 0");
-        }
-      } catch (NumberFormatException e) {
-        log.warn(
-            "this value of dynamicProperties key: 
state.checkpoints.num-retained invalid, must be number");
-      }
-    }
-
-    if (cpThreshold == 0) {
-      String flinkConfNumRetained = 
flinkEnv.convertFlinkYamlAsMap().get(numRetainedKey);
-      int numRetainedDefaultValue = 
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
-      if (flinkConfNumRetained != null) {
-        try {
-          int value = Integer.parseInt(flinkConfNumRetained.trim());
-          if (value > 0) {
-            cpThreshold = value;
-          } else {
-            cpThreshold = numRetainedDefaultValue;
-            log.warn(
-                "the value of key: state.checkpoints.num-retained in 
flink-conf.yaml is invalid, must be gt 0, default value: {} will be use",
-                numRetainedDefaultValue);
-          }
-        } catch (NumberFormatException e) {
-          cpThreshold = numRetainedDefaultValue;
-          log.warn(
-              "the value of key: state.checkpoints.num-retained in 
flink-conf.yaml is invalid, must be number, flink env: {}, default value: {} 
will be use",
-              flinkEnv.getFlinkHome(),
-              flinkConfNumRetained);
-        }
-      } else {
-        cpThreshold = numRetainedDefaultValue;
-        log.info(
-            "the application: {} is not set {} in dynamicProperties or value 
is invalid, and flink-conf.yaml is the same problem of flink env: {}, default 
value: {} will be use.",
-            application.getJobName(),
-            numRetainedKey,
-            flinkEnv.getFlinkHome(),
-            numRetainedDefaultValue);
-      }
-    }
-
-    if (CheckPointType.CHECKPOINT.equals(CheckPointType.of(entity.getType()))) 
{
-      cpThreshold = cpThreshold - 1;
-    }
-
-    if (cpThreshold == 0) {
-      LambdaQueryWrapper<SavePoint> queryWrapper =
-          new LambdaQueryWrapper<SavePoint>()
-              .eq(SavePoint::getAppId, entity.getAppId())
-              .eq(SavePoint::getType, 1);
-      this.remove(queryWrapper);
-    } else {
-      LambdaQueryWrapper<SavePoint> queryWrapper =
-          new LambdaQueryWrapper<SavePoint>()
-              .select(SavePoint::getTriggerTime)
-              .eq(SavePoint::getAppId, entity.getAppId())
-              .eq(SavePoint::getType, CheckPointType.CHECKPOINT.get())
-              .orderByDesc(SavePoint::getTriggerTime);
-
-      Page<SavePoint> savePointPage =
-          this.baseMapper.selectPage(new Page<>(1, cpThreshold + 1), 
queryWrapper);
-      if (!savePointPage.getRecords().isEmpty()
-          && savePointPage.getRecords().size() > cpThreshold) {
-        SavePoint savePoint = savePointPage.getRecords().get(cpThreshold - 1);
-        LambdaQueryWrapper<SavePoint> lambdaQueryWrapper =
-            new LambdaQueryWrapper<SavePoint>()
-                .eq(SavePoint::getAppId, entity.getAppId())
-                .eq(SavePoint::getType, 1)
-                .lt(SavePoint::getTriggerTime, savePoint.getTriggerTime());
-        this.remove(lambdaQueryWrapper);
-      }
-    }
-  }
-
   @Override
   public SavePoint getLatest(Long id) {
     LambdaQueryWrapper<SavePoint> queryWrapper =
@@ -228,53 +143,25 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
     Application application = applicationService.getById(appParam.getId());
 
     // 1) properties have the highest priority, read the properties are set: 
-Dstate.savepoints.dir
-    String savepointPath =
-        
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties())
-            .get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
+    String savepointPath = 
getSavepointFromDynamicProps(application.getDynamicProperties());
+    if (StringUtils.isNotBlank(savepointPath)) {
+      return savepointPath;
+    }
 
     // Application conf configuration has the second priority. If it is a 
streampark|flinksql type
-    // task,
-    // see if Application conf is configured when the task is defined, if 
checkpoints are configured
-    // and enabled,
-    // read `state.savepoints.dir`
-    if (StringUtils.isBlank(savepointPath)) {
-      if (application.isStreamParkJob() || application.isFlinkSqlJob()) {
-        ApplicationConfig applicationConfig = 
configService.getEffective(application.getId());
-        if (applicationConfig != null) {
-          Map<String, String> map = applicationConfig.readConfig();
-          if (FlinkUtils.isCheckpointEnabled(map)) {
-            savepointPath = 
map.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
-          }
-        }
-      }
+    // task, see if Application conf is configured when the task is defined, 
if checkpoints are
+    // configured
+    // and enabled, read `state.savepoints.dir`
+    savepointPath = getSavepointFromAppCfgIfStreamParkOrSQLJob(application);
+    if (StringUtils.isNotBlank(savepointPath)) {
+      return savepointPath;
     }
 
     // 3) If the savepoint is not obtained above, try to obtain the savepoint 
path according to the
     // deployment type (remote|on yarn)
-    if (StringUtils.isBlank(savepointPath)) {
-      // 3.1) At the remote mode, request the flink webui interface to get the 
savepoint path
-      if (ExecutionMode.isRemoteMode(application.getExecutionMode())) {
-        FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
-        Utils.notNull(
-            cluster,
-            String.format(
-                "The clusterId=%s cannot be find, maybe the clusterId is wrong 
or "
-                    + "the cluster has been deleted. Please contact the 
Admin.",
-                application.getFlinkClusterId()));
-        Map<String, String> config = cluster.getFlinkConfig();
-        if (!config.isEmpty()) {
-          savepointPath = 
config.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
-        }
-      } else {
-        // 3.2) At the yarn or k8s mode, then read the savepoint in 
flink-conf.yml in the bound
-        // flink
-        FlinkEnv flinkEnv = 
flinkEnvService.getById(application.getVersionId());
-        savepointPath =
-            
flinkEnv.convertFlinkYamlAsMap().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
-      }
-    }
-
-    return savepointPath;
+    // 3.1) At the remote mode, request the flink webui interface to get the 
savepoint path
+    // 3.2) At the yarn or k8s mode, then read the savepoint in flink-conf.yml 
in the bound flink
+    return getSavepointFromDeployLayer(application);
   }
 
   @Override
@@ -299,22 +186,8 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
     FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
 
     // infer savepoint
-    String customSavepoint = this.getFinalSavepointDir(savepointPath, 
application);
-
-    FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
-    String clusterId = getClusterId(application, cluster);
-
-    Map<String, Object> properties = this.tryGetRestProps(application, 
cluster);
-
     TriggerSavepointRequest request =
-        new TriggerSavepointRequest(
-            flinkEnv.getFlinkVersion(),
-            application.getExecutionModeEnum(),
-            properties,
-            clusterId,
-            application.getJobId(),
-            customSavepoint,
-            application.getK8sNamespace());
+        renderTriggerSavepointRequest(savepointPath, application, flinkEnv);
 
     CompletableFuture<SavepointResponse> savepointFuture =
         CompletableFuture.supplyAsync(() -> 
FlinkClient.triggerSavepoint(request), executorService);
@@ -322,6 +195,48 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
     handleSavepointResponseFuture(application, applicationLog, 
savepointFuture);
   }
 
+  @Override
+  @Transactional(rollbackFor = Exception.class)
+  public Boolean delete(Long id, Application application) throws 
InternalException {
+    SavePoint savePoint = getById(id);
+    try {
+      if (CommonUtils.notEmpty(savePoint.getPath())) {
+        application.getFsOperator().delete(savePoint.getPath());
+      }
+      return removeById(id);
+    } catch (Exception e) {
+      throw new InternalException(e.getMessage());
+    }
+  }
+
+  @Override
+  public IPage<SavePoint> page(SavePoint savePoint, RestRequest request) {
+    Page<SavePoint> page =
+        new MybatisPager<SavePoint>().getPage(request, "trigger_time", 
Constant.ORDER_DESC);
+    LambdaQueryWrapper<SavePoint> queryWrapper =
+        new LambdaQueryWrapper<SavePoint>().eq(SavePoint::getAppId, 
savePoint.getAppId());
+    return this.page(page, queryWrapper);
+  }
+
+  @Override
+  public void removeApp(Application application) {
+    Long appId = application.getId();
+
+    LambdaQueryWrapper<SavePoint> queryWrapper =
+        new LambdaQueryWrapper<SavePoint>().eq(SavePoint::getAppId, appId);
+    this.remove(queryWrapper);
+
+    try {
+      application
+          .getFsOperator()
+          
.delete(application.getWorkspace().APP_SAVEPOINTS().concat("/").concat(appId.toString()));
+    } catch (Exception e) {
+      log.error(e.getMessage(), e);
+    }
+  }
+
+  // private methods.
+
   private void handleSavepointResponseFuture(
       Application application,
       ApplicationLog applicationLog,
@@ -405,44 +320,182 @@ public class SavePointServiceImpl extends 
ServiceImpl<SavePointMapper, SavePoint
     return null;
   }
 
-  @Override
-  @Transactional(rollbackFor = Exception.class)
-  public Boolean delete(Long id, Application application) throws 
InternalException {
-    SavePoint savePoint = getById(id);
+  /**
+   * Try to get the savepoint config item from the dynamic properties.
+   *
+   * @param dynamicProps dynamic properties string.
+   * @return the value of the savepoint in the dynamic properties.
+   */
+  @VisibleForTesting
+  @Nullable
+  public String getSavepointFromDynamicProps(String dynamicProps) {
+    return 
extractDynamicPropertiesAsJava(dynamicProps).get(SAVEPOINT_DIRECTORY.key());
+  }
+
+  /**
+   * Try to obtain the savepoint path If it is a streampark|flinksql type 
task. See if Application
+   * conf is configured when the task is defined, if checkpoints are 
configured and enabled, read
+   * `state.savepoints.dir`.
+   *
+   * @param application the target application.
+   * @return the value of the savepoint if existed.
+   */
+  @VisibleForTesting
+  @Nullable
+  public String getSavepointFromAppCfgIfStreamParkOrSQLJob(Application 
application) {
+    if (!application.isStreamParkJob() && !application.isFlinkSqlJob()) {
+      return null;
+    }
+    ApplicationConfig applicationConfig = 
configService.getEffective(application.getId());
+    if (applicationConfig == null) {
+      return null;
+    }
+    Map<String, String> map = applicationConfig.readConfig();
+    return FlinkUtils.isCheckpointEnabled(map) ? 
map.get(SAVEPOINT_DIRECTORY.key()) : null;
+  }
+
+  /**
+   * Try to obtain the savepoint path according to the eployment type 
(remote|on yarn). At the
+   * remote mode, request the flink webui interface to get the savepoint path 
At the yarn or k8s
+   * mode, then read the savepoint in flink-conf.yml in the bound flink
+   *
+   * @param application the target application.
+   * @return the value of the savepoint if existed.
+   */
+  @VisibleForTesting
+  @Nullable
+  public String getSavepointFromDeployLayer(Application application)
+      throws JsonProcessingException {
+    // At the yarn or k8s mode, then read the savepoint in flink-conf.yml in 
the bound flink
+    if (!ExecutionMode.isRemoteMode(application.getExecutionMode())) {
+      FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
+      return flinkEnv.convertFlinkYamlAsMap().get(SAVEPOINT_DIRECTORY.key());
+    }
+
+    // At the remote mode, request the flink webui interface to get the 
savepoint path
+    FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
+    Utils.notNull(
+        cluster,
+        String.format(
+            "The clusterId=%s cannot be find, maybe the clusterId is wrong or "
+                + "the cluster has been deleted. Please contact the Admin.",
+            application.getFlinkClusterId()));
+    Map<String, String> config = cluster.getFlinkConfig();
+    return config.isEmpty() ? null : config.get(SAVEPOINT_DIRECTORY.key());
+  }
+
+  /** Try get the 'state.checkpoints.num-retained' from the dynamic 
properties. */
+  private Optional<Integer> tryGetChkNumRetainedFromDynamicProps(String 
dynamicProps) {
+    String rawCfgValue =
+        
extractDynamicPropertiesAsJava(dynamicProps).get(MAX_RETAINED_CHECKPOINTS.key());
+    if (StringUtils.isEmpty(rawCfgValue)) {
+      return Optional.empty();
+    }
     try {
-      if (CommonUtils.notEmpty(savePoint.getPath())) {
-        application.getFsOperator().delete(savePoint.getPath());
+      int value = Integer.parseInt(rawCfgValue.trim());
+      if (value > 0) {
+        return Optional.of(value);
       }
-      removeById(id);
-      return true;
-    } catch (Exception e) {
-      throw new InternalException(e.getMessage());
+      log.warn(
+          "This value of dynamicProperties key: state.checkpoints.num-retained 
is invalid, must be gt 0");
+    } catch (NumberFormatException e) {
+      log.warn(
+          "This value of dynamicProperties key: state.checkpoints.num-retained 
invalid, must be number");
     }
+    return Optional.empty();
   }
 
-  @Override
-  public IPage<SavePoint> page(SavePoint savePoint, RestRequest request) {
-    Page<SavePoint> page =
-        new MybatisPager<SavePoint>().getPage(request, "trigger_time", 
Constant.ORDER_DESC);
-    LambdaQueryWrapper<SavePoint> queryWrapper =
-        new LambdaQueryWrapper<SavePoint>().eq(SavePoint::getAppId, 
savePoint.getAppId());
-    return this.page(page, queryWrapper);
+  /** Try get the 'state.checkpoints.num-retained' from the flink env. */
+  private int getChkNumRetainedFromFlinkEnv(
+      @Nonnull FlinkEnv flinkEnv, @Nonnull Application application) {
+    String flinkConfNumRetained =
+        flinkEnv.convertFlinkYamlAsMap().get(MAX_RETAINED_CHECKPOINTS.key());
+    if (StringUtils.isEmpty(flinkConfNumRetained)) {
+      log.info(
+          "The application: {} is not set {} in dynamicProperties or value is 
invalid, and flink-conf.yaml is the same problem of flink env: {}, default 
value: {} will be use.",
+          application.getJobName(),
+          MAX_RETAINED_CHECKPOINTS.key(),
+          flinkEnv.getFlinkHome(),
+          MAX_RETAINED_CHECKPOINTS.defaultValue());
+      return MAX_RETAINED_CHECKPOINTS.defaultValue();
+    }
+    try {
+      int value = Integer.parseInt(flinkConfNumRetained.trim());
+      if (value > 0) {
+        return value;
+      }
+      log.warn(
+          "The value of key: state.checkpoints.num-retained in flink-conf.yaml 
is invalid, must be gt 0, default value: {} will be use",
+          MAX_RETAINED_CHECKPOINTS.defaultValue());
+    } catch (NumberFormatException e) {
+      log.warn(
+          "The value of key: state.checkpoints.num-retained in flink-conf.yaml 
is invalid, must be number, flink env: {}, default value: {} will be use",
+          flinkEnv.getFlinkHome(),
+          flinkConfNumRetained);
+    }
+    return MAX_RETAINED_CHECKPOINTS.defaultValue();
   }
 
-  @Override
-  public void removeApp(Application application) {
-    Long appId = application.getId();
+  private void expire(SavePoint entity) {
+    FlinkEnv flinkEnv = flinkEnvService.getByAppId(entity.getAppId());
+    Application application = applicationService.getById(entity.getAppId());
+    Utils.notNull(flinkEnv);
+    Utils.notNull(application);
 
-    LambdaQueryWrapper<SavePoint> queryWrapper =
-        new LambdaQueryWrapper<SavePoint>().eq(SavePoint::getAppId, appId);
-    this.remove(queryWrapper);
+    int cpThreshold =
+        
tryGetChkNumRetainedFromDynamicProps(application.getDynamicProperties())
+            .orElse(getChkNumRetainedFromFlinkEnv(flinkEnv, application));
+    cpThreshold =
+        CHECKPOINT.equals(CheckPointType.of(entity.getType())) ? cpThreshold - 
1 : cpThreshold;
 
-    try {
-      application
-          .getFsOperator()
-          
.delete(application.getWorkspace().APP_SAVEPOINTS().concat("/").concat(appId.toString()));
-    } catch (Exception e) {
-      log.error(e.getMessage(), e);
+    if (cpThreshold == 0) {
+      LambdaQueryWrapper<SavePoint> queryWrapper =
+          new LambdaQueryWrapper<SavePoint>()
+              .eq(SavePoint::getAppId, entity.getAppId())
+              .eq(SavePoint::getType, CHECKPOINT.get());
+      this.remove(queryWrapper);
+      return;
+    }
+
+    LambdaQueryWrapper<SavePoint> queryWrapper =
+        new LambdaQueryWrapper<SavePoint>()
+            .select(SavePoint::getTriggerTime)
+            .eq(SavePoint::getAppId, entity.getAppId())
+            .eq(SavePoint::getType, CHECKPOINT.get())
+            .orderByDesc(SavePoint::getTriggerTime);
+
+    Page<SavePoint> savePointPage =
+        this.baseMapper.selectPage(new Page<>(1, cpThreshold + 1), 
queryWrapper);
+    if (CollectionUtils.isEmpty(savePointPage.getRecords())
+        || savePointPage.getRecords().size() <= cpThreshold) {
+      return;
     }
+    SavePoint savePoint = savePointPage.getRecords().get(cpThreshold - 1);
+    LambdaQueryWrapper<SavePoint> lambdaQueryWrapper =
+        new LambdaQueryWrapper<SavePoint>()
+            .eq(SavePoint::getAppId, entity.getAppId())
+            .eq(SavePoint::getType, CHECKPOINT.get())
+            .lt(SavePoint::getTriggerTime, savePoint.getTriggerTime());
+    this.remove(lambdaQueryWrapper);
+  }
+
+  @Nonnull
+  private TriggerSavepointRequest renderTriggerSavepointRequest(
+      @Nullable String savepointPath, Application application, FlinkEnv 
flinkEnv) {
+    String customSavepoint = this.getFinalSavepointDir(savepointPath, 
application);
+
+    FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
+    String clusterId = getClusterId(application, cluster);
+
+    Map<String, Object> properties = this.tryGetRestProps(application, 
cluster);
+
+    return new TriggerSavepointRequest(
+        flinkEnv.getFlinkVersion(),
+        application.getExecutionModeEnum(),
+        properties,
+        clusterId,
+        application.getJobId(),
+        customSavepoint,
+        application.getK8sNamespace());
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
new file mode 100644
index 000000000..53133143b
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.common.enums.ApplicationType;
+import org.apache.streampark.common.enums.DevelopmentMode;
+import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.DeflaterUtils;
+import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.ApplicationConfig;
+import org.apache.streampark.console.core.entity.Effective;
+import org.apache.streampark.console.core.entity.FlinkEnv;
+import org.apache.streampark.console.core.enums.ConfigFileType;
+import org.apache.streampark.console.core.enums.EffectiveType;
+import org.apache.streampark.console.core.service.impl.SavePointServiceImpl;
+
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import static 
org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY;
+import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Test class for the implementation {@link
+ * org.apache.streampark.console.core.service.impl.SavePointServiceImpl} of 
{@link
+ * SavePointService}.
+ */
+class SavePointServiceTest extends SpringTestBase {
+
+  @Autowired private SavePointService savePointService;
+
+  @Autowired private ApplicationConfigService configService;
+
+  @Autowired private EffectiveService effectiveService;
+
+  @Autowired private FlinkEnvService flinkEnvService;
+  @Autowired private FlinkClusterService flinkClusterService;
+  @Autowired ApplicationService applicationService;
+
+  @AfterEach
+  void cleanTestRecordsInDatabase() {
+    savePointService.remove(new QueryWrapper<>());
+    configService.remove(new QueryWrapper<>());
+    effectiveService.remove(new QueryWrapper<>());
+    flinkEnvService.remove(new QueryWrapper<>());
+    flinkClusterService.remove(new QueryWrapper<>());
+    applicationService.remove(new QueryWrapper<>());
+  }
+
+  /**
+   * This part will be migrated into the corresponding test cases about
+   * PropertiesUtils.extractDynamicPropertiesAsJava.
+   */
+  @Test
+  void testGetSavepointFromDynamicProps() {
+    String propsWithEmptyTargetValue = "-Dstate.savepoints.dir=";
+    String props = "-Dstate.savepoints.dir=hdfs:///test";
+    SavePointServiceImpl savePointServiceImpl = (SavePointServiceImpl) 
savePointService;
+
+    
assertThat(savePointServiceImpl.getSavepointFromDynamicProps(null)).isNull();
+    
assertThat(savePointServiceImpl.getSavepointFromDynamicProps(props)).isEqualTo("hdfs:///test");
+    
assertThat(savePointServiceImpl.getSavepointFromDynamicProps(propsWithEmptyTargetValue))
+        .isEmpty();
+  }
+
+  @Test
+  void testGetSavepointFromAppCfgIfStreamParkOrSQLJob() {
+    SavePointServiceImpl savePointServiceImpl = (SavePointServiceImpl) 
savePointService;
+    Application app = new Application();
+    Long appId = 1L;
+    Long appCfgId = 1L;
+    app.setId(appId);
+
+    // Test for non-(StreamPark job Or FlinkSQL job)
+    app.setAppType(ApplicationType.APACHE_FLINK.getType());
+    
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
+    app.setAppType(ApplicationType.STREAMPARK_FLINK.getType());
+    app.setJobType(DevelopmentMode.CUSTOM_CODE.getValue());
+    
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
+
+    // Test for (StreamPark job Or FlinkSQL job) without application config.
+    app.setAppType(ApplicationType.STREAMPARK_FLINK.getType());
+    
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
+    app.setAppType(ApplicationType.STREAMPARK_FLINK.getType());
+    app.setJobType(DevelopmentMode.CUSTOM_CODE.getValue());
+    
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
+
+    // Test for (StreamPark job Or FlinkSQL job) with application config just 
disabled checkpoint.
+    ApplicationConfig appCfg = new ApplicationConfig();
+    appCfg.setId(appCfgId);
+    appCfg.setAppId(appId);
+    appCfg.setContent("state.savepoints.dir=hdfs:///test");
+    appCfg.setFormat(ConfigFileType.PROPERTIES.getValue());
+    configService.save(appCfg);
+    
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
+
+    // Test for (StreamPark job or FlinkSQL job) with application config and 
enabled checkpoint and
+    // configured value.
+
+    // Test for non-value for CHECKPOINTING_INTERVAL
+    appCfg.setContent("");
+    configService.updateById(appCfg);
+    
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
+
+    // Test for configured CHECKPOINTING_INTERVAL
+    appCfg.setContent(
+        DeflaterUtils.zipString(
+            "state.savepoints.dir=hdfs:///test\n"
+                + String.format("%s=%s", CHECKPOINTING_INTERVAL.key(), 
"3min")));
+    configService.updateById(appCfg);
+    Effective effective = new Effective();
+    effective.setTargetId(appCfg.getId());
+    effective.setAppId(appId);
+    effective.setTargetType(EffectiveType.CONFIG.getType());
+    effectiveService.save(effective);
+    
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app))
+        .isEqualTo("hdfs:///test");
+  }
+
+  @Test
+  void testGetSavepointFromDeployLayer() throws JsonProcessingException {
+    SavePointServiceImpl savePointServiceImpl = (SavePointServiceImpl) 
savePointService;
+    Long appId = 1L;
+    Long idOfFlinkEnv = 1L;
+    Long teamId = 1L;
+    Application application = new Application();
+    application.setId(appId);
+    application.setTeamId(teamId);
+    application.setVersionId(idOfFlinkEnv);
+    application.setExecutionMode(ExecutionMode.YARN_APPLICATION.getMode());
+    applicationService.save(application);
+
+    FlinkEnv flinkEnv = new FlinkEnv();
+    flinkEnv.setFlinkName("mockFlinkName");
+    flinkEnv.setFlinkHome("/tmp");
+    flinkEnv.setId(idOfFlinkEnv);
+    flinkEnv.setVersion("1.15.3");
+    flinkEnv.setScalaVersion("2.12");
+    flinkEnv.setFlinkConf(DeflaterUtils.zipString(SAVEPOINT_DIRECTORY.key() + 
": hdfs:///test"));
+    flinkEnvService.save(flinkEnv);
+
+    // Test for non-remote mode
+    assertThat(savePointServiceImpl.getSavepointFromDeployLayer(application))
+        .isEqualTo("hdfs:///test");
+
+    // Start the test lines for remote mode
+    Long clusterId = 1L;
+
+    // Test for it without cluster.
+    application.setExecutionMode(ExecutionMode.REMOTE.getMode());
+    application.setFlinkClusterId(clusterId);
+    assertThatThrownBy(() -> 
savePointServiceImpl.getSavepointFromDeployLayer(application))
+        .isInstanceOf(NullPointerException.class);
+
+    // Ignored.
+    // Test for it with empty config
+    // Test for it with the configured empty target value
+    // Test for it with the configured non-empty target value
+
+  }
+}

Reply via email to