This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new 5ff209809 [Improve] cp/sp path improvement (#4027)
5ff209809 is described below
commit 5ff20980985223dcafc0b0ff778d587fa27c24aa
Author: benjobs <[email protected]>
AuthorDate: Tue Sep 3 18:42:54 2024 +0800
[Improve] cp/sp path improvement (#4027)
---
.../console/core/service/SavepointService.java | 2 +
.../core/service/impl/ApplicationServiceImpl.java | 25 +++++----
.../core/service/impl/SavepointServiceImpl.java | 62 ++++++++++++----------
3 files changed, 52 insertions(+), 37 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
index 60cedc978..2c31d77a3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
@@ -42,4 +42,6 @@ public interface SavepointService extends IService<Savepoint>
{
void removeApp(Application application);
String getSavePointPath(Application app) throws Exception;
+
+ String processPath(String path, String jobName, Long jobId);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 51246f5ee..ce5c86ed1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1336,7 +1336,11 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
String customSavepoint = null;
if (appParam.getRestoreOrTriggerSavepoint()) {
customSavepoint = appParam.getSavepointPath();
- if (StringUtils.isBlank(customSavepoint)) {
+ if (customSavepoint == null) {
+ customSavepoint =
+ savepointService.processPath(
+ customSavepoint, application.getJobName(),
application.getId());
+ } else {
customSavepoint = savepointService.getSavePointPath(appParam);
}
}
@@ -1451,23 +1455,23 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
final String scheme = uri.getScheme();
final String pathPart = uri.getPath();
if (scheme == null) {
- return "This state.savepoints.dir value "
+ return "This state savepoint dir value "
+ savepointPath
+ " scheme (hdfs://, file://, etc) of is null. Please specify the
file system scheme explicitly in the URI.";
}
if (pathPart == null) {
- return "This state.savepoints.dir value "
+ return "This state savepoint dir value "
+ savepointPath
+ " path part to store the checkpoint data in is null. Please
specify a directory path for the checkpoint data.";
}
if (pathPart.isEmpty() || "/".equals(pathPart)) {
- return "This state.savepoints.dir value "
+ return "This state savepoint dir value "
+ savepointPath
+ " Cannot use the root directory for checkpoints.";
}
return null;
} else {
- return "When custom savepoint is not set, state.savepoints.dir needs to
be set in properties or flink-conf.yaml of application";
+ return "When a custom savepoint is not set, state.savepoints.dir or
execution.checkpointing.savepoint-dir needs to be configured in the properties
or flink-conf.yaml of the application.";
}
}
@@ -1660,7 +1664,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
application.getJobName(),
appConf,
application.getApplicationType(),
- getSavepointPath(appParam),
+ getSavepointPath(appParam, application.getJobName(),
application.getId()),
applicationArgs,
buildResult,
extraParameter,
@@ -1877,19 +1881,20 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
return false;
}
- private String getSavepointPath(Application appParam) {
+ private String getSavepointPath(Application appParam, String jobName, Long
jobId) {
+ String path = null;
if (appParam.getRestoreOrTriggerSavepoint() != null
&& appParam.getRestoreOrTriggerSavepoint()) {
if (StringUtils.isBlank(appParam.getSavepointPath())) {
Savepoint savepoint = savepointService.getLatest(appParam.getId());
if (savepoint != null) {
- return savepoint.getPath();
+ path = savepoint.getPath();
}
} else {
- return appParam.getSavepointPath();
+ path = appParam.getSavepointPath();
}
}
- return null;
+ return savepointService.processPath(path, jobName, jobId);
}
/**
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
index ead9934a8..61b025b01 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
@@ -70,6 +70,7 @@ import java.net.URI;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -95,7 +96,7 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
@Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
- private static final String SAVEPOINT_DIRECTORY_NEW_KEY =
"execution.checkpointing.dir";
+ private static final String SAVEPOINT_DIRECTORY_NEW_KEY =
"execution.checkpointing.savepoint-dir";
private static final String MAX_RETAINED_CHECKPOINTS_NEW_KEY =
"execution.checkpointing.num-retained";
@@ -244,16 +245,13 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
Application application = applicationService.getById(appParam.getId());
// 1) properties have the highest priority, read the properties are set:
-Dstate.savepoints.dir
- String savepointPath =
-
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties())
- .get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
+ Map<String, String> properties =
+
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties());
- if (StringUtils.isBlank(savepointPath)) {
- // for flink 1.20
- savepointPath =
-
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties())
- .get(SAVEPOINT_DIRECTORY_NEW_KEY);
- }
+ String savepointPath =
+ properties.getOrDefault(
+ CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
+ properties.get(SAVEPOINT_DIRECTORY_NEW_KEY));
// Application conf configuration has the second priority. If it is a
streampark|flinksql type
// task,
@@ -266,11 +264,10 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
if (applicationConfig != null) {
Map<String, String> map = applicationConfig.readConfig();
if (FlinkUtils.isCheckpointEnabled(map)) {
- savepointPath =
map.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
- if (StringUtils.isBlank(savepointPath)) {
- // for flink 1.20
- savepointPath = map.get(SAVEPOINT_DIRECTORY_NEW_KEY);
- }
+ savepointPath =
+ map.getOrDefault(
+ CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
+ map.get(SAVEPOINT_DIRECTORY_NEW_KEY));
}
}
}
@@ -290,11 +287,10 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
application.getFlinkClusterId()));
Map<String, String> config = cluster.getFlinkConfig();
if (!config.isEmpty()) {
- savepointPath =
config.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
- if (StringUtils.isBlank(savepointPath)) {
- // for flink 1.20
- savepointPath = config.get(SAVEPOINT_DIRECTORY_NEW_KEY);
- }
+ savepointPath =
+ config.getOrDefault(
+ CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
+ config.get(SAVEPOINT_DIRECTORY_NEW_KEY));
}
}
}
@@ -303,16 +299,25 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
if (StringUtils.isBlank(savepointPath)) {
// flink
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
+ Properties flinkConfig = flinkEnv.getFlinkConfig();
savepointPath =
-
flinkEnv.getFlinkConfig().getProperty(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
-
- if (StringUtils.isBlank(savepointPath)) {
- // for flink 1.20
- savepointPath =
flinkEnv.getFlinkConfig().getProperty(SAVEPOINT_DIRECTORY_NEW_KEY);
- }
+ flinkConfig.getProperty(
+ CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
+ flinkConfig.getProperty(SAVEPOINT_DIRECTORY_NEW_KEY));
}
- return savepointPath;
+ return processPath(savepointPath, application.getJobName(),
application.getId());
+ }
+
+ @Override
+ public String processPath(String path, String jobName, Long jobId) {
+ if (StringUtils.isNotBlank(path)) {
+ return path.replaceAll("\\$job(Id|id)", jobId.toString())
+ .replaceAll("\\$\\{job(Id|id)}", jobId.toString())
+ .replaceAll("\\$job(Name|name)", jobName)
+ .replaceAll("\\$\\{job(Name|name)}", jobName);
+ }
+ return path;
}
@Override
@@ -340,6 +345,9 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
// infer savepoint
String customSavepoint = this.getFinalSavepointDir(savepointPath,
application);
+ if (StringUtils.isNotBlank(customSavepoint)) {
+ customSavepoint = processPath(customSavepoint, application.getJobName(),
application.getId());
+ }
FlinkCluster cluster =
flinkClusterService.getById(application.getFlinkClusterId());
String clusterId = getClusterId(application, cluster);