This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch sync_2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/sync_2.1.3 by this push:
new dce81abc2 [Improve] method name improvements
dce81abc2 is described below
commit dce81abc2ccf1fc2c26f22ee80e75b456ee74b4e
Author: benjobs <[email protected]>
AuthorDate: Wed Mar 13 17:12:05 2024 +0800
[Improve] method name improvements
---
.../console/core/service/impl/SavePointServiceImpl.java | 5 +++--
.../console/core/service/SavePointServiceTest.java | 14 +++++++-------
2 files changed, 10 insertions(+), 9 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
index 2a570ba6b..7fab0fc92 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.service.impl;
+import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.ExceptionUtils;
@@ -146,7 +147,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
// task, see if Application conf is configured when the task is defined,
if checkpoints are
// configured
// and enabled, read `state.savepoints.dir`
- savepointPath = getSavepointFromAppCfgIfStreamParkOrSQLJob(application);
+ savepointPath = getSavepointFromConfig(application);
if (StringUtils.isNotBlank(savepointPath)) {
return savepointPath;
}
@@ -340,7 +341,7 @@ public class SavePointServiceImpl extends
ServiceImpl<SavePointMapper, SavePoint
*/
@VisibleForTesting
@Nullable
- public String getSavepointFromAppCfgIfStreamParkOrSQLJob(Application
application) {
+ public String getSavepointFromConfig(Application application) {
if (!application.isStreamParkJob() && !application.isFlinkSqlJob()) {
return null;
}
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
index e56d76f6e..256c95823 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
@@ -95,17 +95,17 @@ class SavePointServiceTest extends SpringUnitTestBase {
// Test for non-(StreamPark job Or FlinkSQL job)
app.setAppType(ApplicationType.APACHE_FLINK.getType());
-
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
+ assertThat(savePointServiceImpl.getSavepointFromConfig(app)).isNull();
app.setAppType(ApplicationType.STREAMPARK_FLINK.getType());
app.setJobType(FlinkDevelopmentMode.CUSTOM_CODE.getMode());
-
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
+ assertThat(savePointServiceImpl.getSavepointFromConfig(app)).isNull();
// Test for (StreamPark job Or FlinkSQL job) without application config.
app.setAppType(ApplicationType.STREAMPARK_FLINK.getType());
-
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
+ assertThat(savePointServiceImpl.getSavepointFromConfig(app)).isNull();
app.setAppType(ApplicationType.STREAMPARK_FLINK.getType());
app.setJobType(FlinkDevelopmentMode.CUSTOM_CODE.getMode());
-
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
+ assertThat(savePointServiceImpl.getSavepointFromConfig(app)).isNull();
// Test for (StreamPark job Or FlinkSQL job) with application config just
disabled checkpoint.
ApplicationConfig appCfg = new ApplicationConfig();
@@ -114,7 +114,7 @@ class SavePointServiceTest extends SpringUnitTestBase {
appCfg.setContent("state.savepoints.dir=hdfs:///test");
appCfg.setFormat(ConfigFileTypeEnum.PROPERTIES.getValue());
configService.save(appCfg);
-
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
+ assertThat(savePointServiceImpl.getSavepointFromConfig(app)).isNull();
// Test for (StreamPark job or FlinkSQL job) with application config and
enabled checkpoint and
// configured value.
@@ -122,7 +122,7 @@ class SavePointServiceTest extends SpringUnitTestBase {
// Test for non-value for CHECKPOINTING_INTERVAL
appCfg.setContent("");
configService.updateById(appCfg);
-
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
+ assertThat(savePointServiceImpl.getSavepointFromConfig(app)).isNull();
// Test for configured CHECKPOINTING_INTERVAL
appCfg.setContent(
@@ -135,7 +135,7 @@ class SavePointServiceTest extends SpringUnitTestBase {
effective.setAppId(appId);
effective.setTargetType(EffectiveTypeEnum.CONFIG.getType());
effectiveService.save(effective);
-
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app))
+ assertThat(savePointServiceImpl.getSavepointFromConfig(app))
.isEqualTo("hdfs:///test");
}