This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new b750d8b89 [Improve] flink checkpointing conf key minor improvement
(#4004)
b750d8b89 is described below
commit b750d8b89572fccdefc6f978394b57d0c608a272
Author: benjobs <[email protected]>
AuthorDate: Thu Aug 29 13:45:50 2024 +0800
[Improve] flink checkpointing conf key minor improvement (#4004)
* [Improve] flink checkpointing conf key minor improvement
* [Improve] code style minor improvement
---
.../core/service/impl/SavepointServiceImpl.java | 42 +++++++++++++++++++---
1 file changed, 37 insertions(+), 5 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
index 856858b87..cf3c630e1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
@@ -95,6 +95,11 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
@Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
+ private static final String SAVEPOINT_DIRECTORY_NEW_KEY =
"execution.checkpointing.dir";
+
+ private static final String MAX_RETAINED_CHECKPOINTS_NEW_KEY =
+ "execution.checkpointing.num-retained";
+
private static final int CPU_NUM = Math.max(2,
Runtime.getRuntime().availableProcessors());
private final ExecutorService flinkTriggerExecutor =
@@ -133,6 +138,13 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties())
.get(numRetainedKey);
+ if (StringUtils.isBlank(numRetainedFromDynamicProp)) {
+ // for flink 1.20
+ numRetainedFromDynamicProp =
+
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties())
+ .get(MAX_RETAINED_CHECKPOINTS_NEW_KEY);
+ }
+
int cpThreshold = 0;
if (numRetainedFromDynamicProp != null) {
try {
@@ -141,17 +153,17 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
cpThreshold = value;
} else {
log.warn(
- "this value of dynamicProperties key:
state.checkpoints.num-retained is invalid, must be gt 0");
+ "this value of dynamicProperties key:
state.checkpoints.num-retained or execution.checkpointing.num-retained is
invalid, must be gt 0");
}
} catch (NumberFormatException e) {
log.warn(
- "this value of dynamicProperties key:
state.checkpoints.num-retained invalid, must be number");
+ "this value of dynamicProperties key:
state.checkpoints.num-retained or execution.checkpointing.num-retained invalid,
must be number");
}
}
if (cpThreshold == 0) {
String flinkConfNumRetained =
flinkEnv.getFlinkConfig().getProperty(numRetainedKey);
- int numRetainedDefaultValue =
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
+ int numRetainedDefaultValue = 1;
if (flinkConfNumRetained != null) {
try {
int value = Integer.parseInt(flinkConfNumRetained.trim());
@@ -160,13 +172,13 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
} else {
cpThreshold = numRetainedDefaultValue;
log.warn(
- "the value of key: state.checkpoints.num-retained in
flink-conf.yaml is invalid, must be gt 0, default value: {} will be use",
+ "the value of key: state.checkpoints.num-retained or
execution.checkpointing.num-retained in flink-conf.yaml is invalid, must be gt
0, default value: {} will be use",
numRetainedDefaultValue);
}
} catch (NumberFormatException e) {
cpThreshold = numRetainedDefaultValue;
log.warn(
- "the value of key: state.checkpoints.num-retained in
flink-conf.yaml is invalid, must be number, flink env: {}, default value: {}
will be use",
+ "the value of key: state.checkpoints.num-retained or
execution.checkpointing.num-retained in flink-conf.yaml is invalid, must be
number, flink env: {}, default value: {} will be use",
flinkEnv.getFlinkHome(),
flinkConfNumRetained);
}
@@ -232,6 +244,13 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties())
.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
+ if (StringUtils.isBlank(savepointPath)) {
+ // for flink 1.20
+ savepointPath =
+
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties())
+ .get(SAVEPOINT_DIRECTORY_NEW_KEY);
+ }
+
// Application conf configuration has the second priority. If it is a
streampark|flinksql type
// task,
// see if Application conf is configured when the task is defined, if
checkpoints are configured
@@ -244,6 +263,10 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
Map<String, String> map = applicationConfig.readConfig();
if (FlinkUtils.isCheckpointEnabled(map)) {
savepointPath =
map.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
+ if (StringUtils.isBlank(savepointPath)) {
+ // for flink 1.20
+ savepointPath = map.get(SAVEPOINT_DIRECTORY_NEW_KEY);
+ }
}
}
}
@@ -264,6 +287,10 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
Map<String, String> config = cluster.getFlinkConfig();
if (!config.isEmpty()) {
savepointPath =
config.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
+ if (StringUtils.isBlank(savepointPath)) {
+ // for flink 1.20
+ savepointPath = config.get(SAVEPOINT_DIRECTORY_NEW_KEY);
+ }
}
}
}
@@ -274,6 +301,11 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
savepointPath =
flinkEnv.getFlinkConfig().getProperty(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
+
+ if (StringUtils.isBlank(savepointPath)) {
+ // for flink 1.20
+ savepointPath =
flinkEnv.getFlinkConfig().getProperty(SAVEPOINT_DIRECTORY_NEW_KEY);
+ }
}
return savepointPath;