This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 6993ff41f [Issue-2534][Improve] Optimize methods for Savepoint
service. (#2592)
6993ff41f is described below
commit 6993ff41f0ac7ce0a0c272d6bc0ba3496a6bc8dd
Author: Roc Marshal <[email protected]>
AuthorDate: Thu May 11 23:02:05 2023 +0800
[Issue-2534][Improve] Optimize methods for Savepoint service. (#2592)
* [Issue-2534][Improve] Optimize methods for Savepoint service.
* Add License header
* Updated based on review comments
---
.../core/service/impl/SavePointServiceImpl.java | 411 ++++++++++++---------
.../console/core/service/SavePointServiceTest.java | 182 +++++++++
2 files changed, 414 insertions(+), 179 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 843a34850..22be1d5e1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -19,7 +19,6 @@ package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.util.CompletableFutureUtils;
-import org.apache.streampark.common.util.PropertiesUtils;
import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.domain.Constant;
@@ -50,14 +49,16 @@ import
org.apache.streampark.flink.client.bean.SavepointResponse;
import org.apache.streampark.flink.client.bean.TriggerSavepointRequest;
import org.apache.streampark.flink.util.FlinkUtils;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.RestOptions;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -71,6 +72,7 @@ import java.net.URI;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -78,6 +80,11 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import static
org.apache.flink.configuration.CheckpointingOptions.MAX_RETAINED_CHECKPOINTS;
+import static
org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY;
+import static
org.apache.streampark.common.util.PropertiesUtils.extractDynamicPropertiesAsJava;
+import static
org.apache.streampark.console.core.enums.CheckPointType.CHECKPOINT;
+
@Slf4j
@Service
@Transactional(propagation = Propagation.SUPPORTS, readOnly = true,
rollbackFor = Exception.class)
@@ -122,98 +129,6 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
return super.save(entity);
}
- private void expire(SavePoint entity) {
- FlinkEnv flinkEnv = flinkEnvService.getByAppId(entity.getAppId());
- Application application = applicationService.getById(entity.getAppId());
- Utils.notNull(flinkEnv);
- Utils.notNull(application);
-
- String numRetainedKey =
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key();
- String numRetainedFromDynamicProp =
-
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties())
- .get(numRetainedKey);
-
- int cpThreshold = 0;
- if (numRetainedFromDynamicProp != null) {
- try {
- int value = Integer.parseInt(numRetainedFromDynamicProp.trim());
- if (value > 0) {
- cpThreshold = value;
- } else {
- log.warn(
- "this value of dynamicProperties key:
state.checkpoints.num-retained is invalid, must be gt 0");
- }
- } catch (NumberFormatException e) {
- log.warn(
- "this value of dynamicProperties key:
state.checkpoints.num-retained invalid, must be number");
- }
- }
-
- if (cpThreshold == 0) {
- String flinkConfNumRetained =
flinkEnv.convertFlinkYamlAsMap().get(numRetainedKey);
- int numRetainedDefaultValue =
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
- if (flinkConfNumRetained != null) {
- try {
- int value = Integer.parseInt(flinkConfNumRetained.trim());
- if (value > 0) {
- cpThreshold = value;
- } else {
- cpThreshold = numRetainedDefaultValue;
- log.warn(
- "the value of key: state.checkpoints.num-retained in
flink-conf.yaml is invalid, must be gt 0, default value: {} will be use",
- numRetainedDefaultValue);
- }
- } catch (NumberFormatException e) {
- cpThreshold = numRetainedDefaultValue;
- log.warn(
- "the value of key: state.checkpoints.num-retained in
flink-conf.yaml is invalid, must be number, flink env: {}, default value: {}
will be use",
- flinkEnv.getFlinkHome(),
- flinkConfNumRetained);
- }
- } else {
- cpThreshold = numRetainedDefaultValue;
- log.info(
- "the application: {} is not set {} in dynamicProperties or value
is invalid, and flink-conf.yaml is the same problem of flink env: {}, default
value: {} will be use.",
- application.getJobName(),
- numRetainedKey,
- flinkEnv.getFlinkHome(),
- numRetainedDefaultValue);
- }
- }
-
- if (CheckPointType.CHECKPOINT.equals(CheckPointType.of(entity.getType())))
{
- cpThreshold = cpThreshold - 1;
- }
-
- if (cpThreshold == 0) {
- LambdaQueryWrapper<SavePoint> queryWrapper =
- new LambdaQueryWrapper<SavePoint>()
- .eq(SavePoint::getAppId, entity.getAppId())
- .eq(SavePoint::getType, 1);
- this.remove(queryWrapper);
- } else {
- LambdaQueryWrapper<SavePoint> queryWrapper =
- new LambdaQueryWrapper<SavePoint>()
- .select(SavePoint::getTriggerTime)
- .eq(SavePoint::getAppId, entity.getAppId())
- .eq(SavePoint::getType, CheckPointType.CHECKPOINT.get())
- .orderByDesc(SavePoint::getTriggerTime);
-
- Page<SavePoint> savePointPage =
- this.baseMapper.selectPage(new Page<>(1, cpThreshold + 1),
queryWrapper);
- if (!savePointPage.getRecords().isEmpty()
- && savePointPage.getRecords().size() > cpThreshold) {
- SavePoint savePoint = savePointPage.getRecords().get(cpThreshold - 1);
- LambdaQueryWrapper<SavePoint> lambdaQueryWrapper =
- new LambdaQueryWrapper<SavePoint>()
- .eq(SavePoint::getAppId, entity.getAppId())
- .eq(SavePoint::getType, 1)
- .lt(SavePoint::getTriggerTime, savePoint.getTriggerTime());
- this.remove(lambdaQueryWrapper);
- }
- }
- }
-
@Override
public SavePoint getLatest(Long id) {
LambdaQueryWrapper<SavePoint> queryWrapper =
@@ -228,53 +143,25 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
Application application = applicationService.getById(appParam.getId());
// 1) properties have the highest priority, read the properties are set:
-Dstate.savepoints.dir
- String savepointPath =
-
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties())
- .get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
+ String savepointPath =
getSavepointFromDynamicProps(application.getDynamicProperties());
+ if (StringUtils.isNotBlank(savepointPath)) {
+ return savepointPath;
+ }
// Application conf configuration has the second priority. If it is a
streampark|flinksql type
- // task,
- // see if Application conf is configured when the task is defined, if
checkpoints are configured
- // and enabled,
- // read `state.savepoints.dir`
- if (StringUtils.isBlank(savepointPath)) {
- if (application.isStreamParkJob() || application.isFlinkSqlJob()) {
- ApplicationConfig applicationConfig =
configService.getEffective(application.getId());
- if (applicationConfig != null) {
- Map<String, String> map = applicationConfig.readConfig();
- if (FlinkUtils.isCheckpointEnabled(map)) {
- savepointPath =
map.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
- }
- }
- }
+ // task, see if Application conf is configured when the task is defined,
if checkpoints are
+ // configured
+ // and enabled, read `state.savepoints.dir`
+ savepointPath = getSavepointFromAppCfgIfStreamParkOrSQLJob(application);
+ if (StringUtils.isNotBlank(savepointPath)) {
+ return savepointPath;
}
// 3) If the savepoint is not obtained above, try to obtain the savepoint
path according to the
// deployment type (remote|on yarn)
- if (StringUtils.isBlank(savepointPath)) {
- // 3.1) At the remote mode, request the flink webui interface to get the
savepoint path
- if (ExecutionMode.isRemoteMode(application.getExecutionMode())) {
- FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
- Utils.notNull(
- cluster,
- String.format(
- "The clusterId=%s cannot be find, maybe the clusterId is wrong
or "
- + "the cluster has been deleted. Please contact the
Admin.",
- application.getFlinkClusterId()));
- Map<String, String> config = cluster.getFlinkConfig();
- if (!config.isEmpty()) {
- savepointPath =
config.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
- }
- } else {
- // 3.2) At the yarn or k8s mode, then read the savepoint in
flink-conf.yml in the bound
- // flink
- FlinkEnv flinkEnv =
flinkEnvService.getById(application.getVersionId());
- savepointPath =
-
flinkEnv.convertFlinkYamlAsMap().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
- }
- }
-
- return savepointPath;
+ // 3.1) At the remote mode, request the flink webui interface to get the
savepoint path
+ // 3.2) At the yarn or k8s mode, then read the savepoint in flink-conf.yml
in the bound flink
+ return getSavepointFromDeployLayer(application);
}
@Override
@@ -299,22 +186,8 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
// infer savepoint
- String customSavepoint = this.getFinalSavepointDir(savepointPath,
application);
-
- FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
- String clusterId = getClusterId(application, cluster);
-
- Map<String, Object> properties = this.tryGetRestProps(application,
cluster);
-
TriggerSavepointRequest request =
- new TriggerSavepointRequest(
- flinkEnv.getFlinkVersion(),
- application.getExecutionModeEnum(),
- properties,
- clusterId,
- application.getJobId(),
- customSavepoint,
- application.getK8sNamespace());
+ renderTriggerSavepointRequest(savepointPath, application, flinkEnv);
CompletableFuture<SavepointResponse> savepointFuture =
CompletableFuture.supplyAsync(() ->
FlinkClient.triggerSavepoint(request), executorService);
@@ -322,6 +195,48 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
handleSavepointResponseFuture(application, applicationLog,
savepointFuture);
}
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public Boolean delete(Long id, Application application) throws
InternalException {
+ SavePoint savePoint = getById(id);
+ try {
+ if (CommonUtils.notEmpty(savePoint.getPath())) {
+ application.getFsOperator().delete(savePoint.getPath());
+ }
+ return removeById(id);
+ } catch (Exception e) {
+ throw new InternalException(e.getMessage());
+ }
+ }
+
+ @Override
+ public IPage<SavePoint> page(SavePoint savePoint, RestRequest request) {
+ Page<SavePoint> page =
+ new MybatisPager<SavePoint>().getPage(request, "trigger_time",
Constant.ORDER_DESC);
+ LambdaQueryWrapper<SavePoint> queryWrapper =
+ new LambdaQueryWrapper<SavePoint>().eq(SavePoint::getAppId,
savePoint.getAppId());
+ return this.page(page, queryWrapper);
+ }
+
+ @Override
+ public void removeApp(Application application) {
+ Long appId = application.getId();
+
+ LambdaQueryWrapper<SavePoint> queryWrapper =
+ new LambdaQueryWrapper<SavePoint>().eq(SavePoint::getAppId, appId);
+ this.remove(queryWrapper);
+
+ try {
+ application
+ .getFsOperator()
+
.delete(application.getWorkspace().APP_SAVEPOINTS().concat("/").concat(appId.toString()));
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+
+ // private methods.
+
private void handleSavepointResponseFuture(
Application application,
ApplicationLog applicationLog,
@@ -405,44 +320,182 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
return null;
}
- @Override
- @Transactional(rollbackFor = Exception.class)
- public Boolean delete(Long id, Application application) throws
InternalException {
- SavePoint savePoint = getById(id);
+ /**
+ * Try to get the savepoint config item from the dynamic properties.
+ *
+ * @param dynamicProps dynamic properties string.
+ * @return the value of the savepoint in the dynamic properties.
+ */
+ @VisibleForTesting
+ @Nullable
+ public String getSavepointFromDynamicProps(String dynamicProps) {
+ return
extractDynamicPropertiesAsJava(dynamicProps).get(SAVEPOINT_DIRECTORY.key());
+ }
+
+ /**
+ * Try to obtain the savepoint path If it is a streampark|flinksql type
task. See if Application
+ * conf is configured when the task is defined, if checkpoints are
configured and enabled, read
+ * `state.savepoints.dir`.
+ *
+ * @param application the target application.
+ * @return the value of the savepoint if existed.
+ */
+ @VisibleForTesting
+ @Nullable
+ public String getSavepointFromAppCfgIfStreamParkOrSQLJob(Application
application) {
+ if (!application.isStreamParkJob() && !application.isFlinkSqlJob()) {
+ return null;
+ }
+ ApplicationConfig applicationConfig =
configService.getEffective(application.getId());
+ if (applicationConfig == null) {
+ return null;
+ }
+ Map<String, String> map = applicationConfig.readConfig();
+ return FlinkUtils.isCheckpointEnabled(map) ?
map.get(SAVEPOINT_DIRECTORY.key()) : null;
+ }
+
+ /**
+ * Try to obtain the savepoint path according to the eployment type
(remote|on yarn). At the
+ * remote mode, request the flink webui interface to get the savepoint path
At the yarn or k8s
+ * mode, then read the savepoint in flink-conf.yml in the bound flink
+ *
+ * @param application the target application.
+ * @return the value of the savepoint if existed.
+ */
+ @VisibleForTesting
+ @Nullable
+ public String getSavepointFromDeployLayer(Application application)
+ throws JsonProcessingException {
+ // At the yarn or k8s mode, then read the savepoint in flink-conf.yml in
the bound flink
+ if (!ExecutionMode.isRemoteMode(application.getExecutionMode())) {
+ FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
+ return flinkEnv.convertFlinkYamlAsMap().get(SAVEPOINT_DIRECTORY.key());
+ }
+
+ // At the remote mode, request the flink webui interface to get the
savepoint path
+ FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
+ Utils.notNull(
+ cluster,
+ String.format(
+ "The clusterId=%s cannot be find, maybe the clusterId is wrong or "
+ + "the cluster has been deleted. Please contact the Admin.",
+ application.getFlinkClusterId()));
+ Map<String, String> config = cluster.getFlinkConfig();
+ return config.isEmpty() ? null : config.get(SAVEPOINT_DIRECTORY.key());
+ }
+
+ /** Try get the 'state.checkpoints.num-retained' from the dynamic
properties. */
+ private Optional<Integer> tryGetChkNumRetainedFromDynamicProps(String
dynamicProps) {
+ String rawCfgValue =
+
extractDynamicPropertiesAsJava(dynamicProps).get(MAX_RETAINED_CHECKPOINTS.key());
+ if (StringUtils.isEmpty(rawCfgValue)) {
+ return Optional.empty();
+ }
try {
- if (CommonUtils.notEmpty(savePoint.getPath())) {
- application.getFsOperator().delete(savePoint.getPath());
+ int value = Integer.parseInt(rawCfgValue.trim());
+ if (value > 0) {
+ return Optional.of(value);
}
- removeById(id);
- return true;
- } catch (Exception e) {
- throw new InternalException(e.getMessage());
+ log.warn(
+ "This value of dynamicProperties key: state.checkpoints.num-retained
is invalid, must be gt 0");
+ } catch (NumberFormatException e) {
+ log.warn(
+ "This value of dynamicProperties key: state.checkpoints.num-retained
invalid, must be number");
}
+ return Optional.empty();
}
- @Override
- public IPage<SavePoint> page(SavePoint savePoint, RestRequest request) {
- Page<SavePoint> page =
- new MybatisPager<SavePoint>().getPage(request, "trigger_time",
Constant.ORDER_DESC);
- LambdaQueryWrapper<SavePoint> queryWrapper =
- new LambdaQueryWrapper<SavePoint>().eq(SavePoint::getAppId,
savePoint.getAppId());
- return this.page(page, queryWrapper);
+ /** Try get the 'state.checkpoints.num-retained' from the flink env. */
+ private int getChkNumRetainedFromFlinkEnv(
+ @Nonnull FlinkEnv flinkEnv, @Nonnull Application application) {
+ String flinkConfNumRetained =
+ flinkEnv.convertFlinkYamlAsMap().get(MAX_RETAINED_CHECKPOINTS.key());
+ if (StringUtils.isEmpty(flinkConfNumRetained)) {
+ log.info(
+ "The application: {} is not set {} in dynamicProperties or value is
invalid, and flink-conf.yaml is the same problem of flink env: {}, default
value: {} will be use.",
+ application.getJobName(),
+ MAX_RETAINED_CHECKPOINTS.key(),
+ flinkEnv.getFlinkHome(),
+ MAX_RETAINED_CHECKPOINTS.defaultValue());
+ return MAX_RETAINED_CHECKPOINTS.defaultValue();
+ }
+ try {
+ int value = Integer.parseInt(flinkConfNumRetained.trim());
+ if (value > 0) {
+ return value;
+ }
+ log.warn(
+ "The value of key: state.checkpoints.num-retained in flink-conf.yaml
is invalid, must be gt 0, default value: {} will be use",
+ MAX_RETAINED_CHECKPOINTS.defaultValue());
+ } catch (NumberFormatException e) {
+ log.warn(
+ "The value of key: state.checkpoints.num-retained in flink-conf.yaml
is invalid, must be number, flink env: {}, default value: {} will be use",
+ flinkEnv.getFlinkHome(),
+ flinkConfNumRetained);
+ }
+ return MAX_RETAINED_CHECKPOINTS.defaultValue();
}
- @Override
- public void removeApp(Application application) {
- Long appId = application.getId();
+ private void expire(SavePoint entity) {
+ FlinkEnv flinkEnv = flinkEnvService.getByAppId(entity.getAppId());
+ Application application = applicationService.getById(entity.getAppId());
+ Utils.notNull(flinkEnv);
+ Utils.notNull(application);
- LambdaQueryWrapper<SavePoint> queryWrapper =
- new LambdaQueryWrapper<SavePoint>().eq(SavePoint::getAppId, appId);
- this.remove(queryWrapper);
+ int cpThreshold =
+
tryGetChkNumRetainedFromDynamicProps(application.getDynamicProperties())
+ .orElse(getChkNumRetainedFromFlinkEnv(flinkEnv, application));
+ cpThreshold =
+ CHECKPOINT.equals(CheckPointType.of(entity.getType())) ? cpThreshold -
1 : cpThreshold;
- try {
- application
- .getFsOperator()
-
.delete(application.getWorkspace().APP_SAVEPOINTS().concat("/").concat(appId.toString()));
- } catch (Exception e) {
- log.error(e.getMessage(), e);
+ if (cpThreshold == 0) {
+ LambdaQueryWrapper<SavePoint> queryWrapper =
+ new LambdaQueryWrapper<SavePoint>()
+ .eq(SavePoint::getAppId, entity.getAppId())
+ .eq(SavePoint::getType, CHECKPOINT.get());
+ this.remove(queryWrapper);
+ return;
+ }
+
+ LambdaQueryWrapper<SavePoint> queryWrapper =
+ new LambdaQueryWrapper<SavePoint>()
+ .select(SavePoint::getTriggerTime)
+ .eq(SavePoint::getAppId, entity.getAppId())
+ .eq(SavePoint::getType, CHECKPOINT.get())
+ .orderByDesc(SavePoint::getTriggerTime);
+
+ Page<SavePoint> savePointPage =
+ this.baseMapper.selectPage(new Page<>(1, cpThreshold + 1),
queryWrapper);
+ if (CollectionUtils.isEmpty(savePointPage.getRecords())
+ || savePointPage.getRecords().size() <= cpThreshold) {
+ return;
}
+ SavePoint savePoint = savePointPage.getRecords().get(cpThreshold - 1);
+ LambdaQueryWrapper<SavePoint> lambdaQueryWrapper =
+ new LambdaQueryWrapper<SavePoint>()
+ .eq(SavePoint::getAppId, entity.getAppId())
+ .eq(SavePoint::getType, CHECKPOINT.get())
+ .lt(SavePoint::getTriggerTime, savePoint.getTriggerTime());
+ this.remove(lambdaQueryWrapper);
+ }
+
+ @Nonnull
+ private TriggerSavepointRequest renderTriggerSavepointRequest(
+ @Nullable String savepointPath, Application application, FlinkEnv
flinkEnv) {
+ String customSavepoint = this.getFinalSavepointDir(savepointPath,
application);
+
+ FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
+ String clusterId = getClusterId(application, cluster);
+
+ Map<String, Object> properties = this.tryGetRestProps(application,
cluster);
+
+ return new TriggerSavepointRequest(
+ flinkEnv.getFlinkVersion(),
+ application.getExecutionModeEnum(),
+ properties,
+ clusterId,
+ application.getJobId(),
+ customSavepoint,
+ application.getK8sNamespace());
}
}
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
new file mode 100644
index 000000000..53133143b
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.common.enums.ApplicationType;
+import org.apache.streampark.common.enums.DevelopmentMode;
+import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.DeflaterUtils;
+import org.apache.streampark.console.SpringTestBase;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.ApplicationConfig;
+import org.apache.streampark.console.core.entity.Effective;
+import org.apache.streampark.console.core.entity.FlinkEnv;
+import org.apache.streampark.console.core.enums.ConfigFileType;
+import org.apache.streampark.console.core.enums.EffectiveType;
+import org.apache.streampark.console.core.service.impl.SavePointServiceImpl;
+
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import static
org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY;
+import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Test class for the implementation {@link
+ * org.apache.streampark.console.core.service.impl.SavePointServiceImpl} of
{@link
+ * SavePointService}.
+ */
+class SavePointServiceTest extends SpringTestBase {
+
+ @Autowired private SavePointService savePointService;
+
+ @Autowired private ApplicationConfigService configService;
+
+ @Autowired private EffectiveService effectiveService;
+
+ @Autowired private FlinkEnvService flinkEnvService;
+ @Autowired private FlinkClusterService flinkClusterService;
+ @Autowired ApplicationService applicationService;
+
+ @AfterEach
+ void cleanTestRecordsInDatabase() {
+ savePointService.remove(new QueryWrapper<>());
+ configService.remove(new QueryWrapper<>());
+ effectiveService.remove(new QueryWrapper<>());
+ flinkEnvService.remove(new QueryWrapper<>());
+ flinkClusterService.remove(new QueryWrapper<>());
+ applicationService.remove(new QueryWrapper<>());
+ }
+
+ /**
+ * This part will be migrated into the corresponding test cases about
+ * PropertiesUtils.extractDynamicPropertiesAsJava.
+ */
+ @Test
+ void testGetSavepointFromDynamicProps() {
+ String propsWithEmptyTargetValue = "-Dstate.savepoints.dir=";
+ String props = "-Dstate.savepoints.dir=hdfs:///test";
+ SavePointServiceImpl savePointServiceImpl = (SavePointServiceImpl)
savePointService;
+
+
assertThat(savePointServiceImpl.getSavepointFromDynamicProps(null)).isNull();
+
assertThat(savePointServiceImpl.getSavepointFromDynamicProps(props)).isEqualTo("hdfs:///test");
+
assertThat(savePointServiceImpl.getSavepointFromDynamicProps(propsWithEmptyTargetValue))
+ .isEmpty();
+ }
+
+ @Test
+ void testGetSavepointFromAppCfgIfStreamParkOrSQLJob() {
+ SavePointServiceImpl savePointServiceImpl = (SavePointServiceImpl)
savePointService;
+ Application app = new Application();
+ Long appId = 1L;
+ Long appCfgId = 1L;
+ app.setId(appId);
+
+ // Test for non-(StreamPark job Or FlinkSQL job)
+ app.setAppType(ApplicationType.APACHE_FLINK.getType());
+
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
+ app.setAppType(ApplicationType.STREAMPARK_FLINK.getType());
+ app.setJobType(DevelopmentMode.CUSTOM_CODE.getValue());
+
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
+
+ // Test for (StreamPark job Or FlinkSQL job) without application config.
+ app.setAppType(ApplicationType.STREAMPARK_FLINK.getType());
+
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
+ app.setAppType(ApplicationType.STREAMPARK_FLINK.getType());
+ app.setJobType(DevelopmentMode.CUSTOM_CODE.getValue());
+
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
+
+ // Test for (StreamPark job Or FlinkSQL job) with application config just
disabled checkpoint.
+ ApplicationConfig appCfg = new ApplicationConfig();
+ appCfg.setId(appCfgId);
+ appCfg.setAppId(appId);
+ appCfg.setContent("state.savepoints.dir=hdfs:///test");
+ appCfg.setFormat(ConfigFileType.PROPERTIES.getValue());
+ configService.save(appCfg);
+
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
+
+ // Test for (StreamPark job or FlinkSQL job) with application config and
enabled checkpoint and
+ // configured value.
+
+ // Test for non-value for CHECKPOINTING_INTERVAL
+ appCfg.setContent("");
+ configService.updateById(appCfg);
+
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
+
+ // Test for configured CHECKPOINTING_INTERVAL
+ appCfg.setContent(
+ DeflaterUtils.zipString(
+ "state.savepoints.dir=hdfs:///test\n"
+ + String.format("%s=%s", CHECKPOINTING_INTERVAL.key(),
"3min")));
+ configService.updateById(appCfg);
+ Effective effective = new Effective();
+ effective.setTargetId(appCfg.getId());
+ effective.setAppId(appId);
+ effective.setTargetType(EffectiveType.CONFIG.getType());
+ effectiveService.save(effective);
+
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app))
+ .isEqualTo("hdfs:///test");
+ }
+
+ @Test
+ void testGetSavepointFromDeployLayer() throws JsonProcessingException {
+ SavePointServiceImpl savePointServiceImpl = (SavePointServiceImpl)
savePointService;
+ Long appId = 1L;
+ Long idOfFlinkEnv = 1L;
+ Long teamId = 1L;
+ Application application = new Application();
+ application.setId(appId);
+ application.setTeamId(teamId);
+ application.setVersionId(idOfFlinkEnv);
+ application.setExecutionMode(ExecutionMode.YARN_APPLICATION.getMode());
+ applicationService.save(application);
+
+ FlinkEnv flinkEnv = new FlinkEnv();
+ flinkEnv.setFlinkName("mockFlinkName");
+ flinkEnv.setFlinkHome("/tmp");
+ flinkEnv.setId(idOfFlinkEnv);
+ flinkEnv.setVersion("1.15.3");
+ flinkEnv.setScalaVersion("2.12");
+ flinkEnv.setFlinkConf(DeflaterUtils.zipString(SAVEPOINT_DIRECTORY.key() +
": hdfs:///test"));
+ flinkEnvService.save(flinkEnv);
+
+ // Test for non-remote mode
+ assertThat(savePointServiceImpl.getSavepointFromDeployLayer(application))
+ .isEqualTo("hdfs:///test");
+
+ // Start the test lines for remote mode
+ Long clusterId = 1L;
+
+ // Test for it without cluster.
+ application.setExecutionMode(ExecutionMode.REMOTE.getMode());
+ application.setFlinkClusterId(clusterId);
+ assertThatThrownBy(() ->
savePointServiceImpl.getSavepointFromDeployLayer(application))
+ .isInstanceOf(NullPointerException.class);
+
+ // Ignored.
+ // Test for it with empty config
+ // Test for it with the configured empty target value
+ // Test for it with the configured non-empty target value
+
+ }
+}