This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
     new b750d8b89 [Improve] flink checkpointing conf key minor improvement 
(#4004)
b750d8b89 is described below

commit b750d8b89572fccdefc6f978394b57d0c608a272
Author: benjobs <[email protected]>
AuthorDate: Thu Aug 29 13:45:50 2024 +0800

    [Improve] flink checkpointing conf key minor improvement (#4004)
    
    * [Improve] flink checkpointing conf key minor improvement
    * [Improve] code style minor improvement
---
 .../core/service/impl/SavepointServiceImpl.java    | 42 +++++++++++++++++++---
 1 file changed, 37 insertions(+), 5 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
index 856858b87..cf3c630e1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
@@ -95,6 +95,11 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
 
   @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
 
+  private static final String SAVEPOINT_DIRECTORY_NEW_KEY = 
"execution.checkpointing.dir";
+
+  private static final String MAX_RETAINED_CHECKPOINTS_NEW_KEY =
+      "execution.checkpointing.num-retained";
+
   private static final int CPU_NUM = Math.max(2, 
Runtime.getRuntime().availableProcessors());
 
   private final ExecutorService flinkTriggerExecutor =
@@ -133,6 +138,13 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
         
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties())
             .get(numRetainedKey);
 
+    if (StringUtils.isBlank(numRetainedFromDynamicProp)) {
+      // for flink 1.20
+      numRetainedFromDynamicProp =
+          
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties())
+              .get(MAX_RETAINED_CHECKPOINTS_NEW_KEY);
+    }
+
     int cpThreshold = 0;
     if (numRetainedFromDynamicProp != null) {
       try {
@@ -141,17 +153,17 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
           cpThreshold = value;
         } else {
           log.warn(
-              "this value of dynamicProperties key: 
state.checkpoints.num-retained is invalid, must be gt 0");
+              "this value of dynamicProperties key: 
state.checkpoints.num-retained or execution.checkpointing.num-retained is 
invalid, must be gt 0");
         }
       } catch (NumberFormatException e) {
         log.warn(
-            "this value of dynamicProperties key: 
state.checkpoints.num-retained invalid, must be number");
+            "this value of dynamicProperties key: 
state.checkpoints.num-retained or execution.checkpointing.num-retained invalid, 
must be number");
       }
     }
 
     if (cpThreshold == 0) {
       String flinkConfNumRetained = 
flinkEnv.getFlinkConfig().getProperty(numRetainedKey);
-      int numRetainedDefaultValue = 
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
+      int numRetainedDefaultValue = 1;
       if (flinkConfNumRetained != null) {
         try {
           int value = Integer.parseInt(flinkConfNumRetained.trim());
@@ -160,13 +172,13 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
           } else {
             cpThreshold = numRetainedDefaultValue;
             log.warn(
-                "the value of key: state.checkpoints.num-retained in 
flink-conf.yaml is invalid, must be gt 0, default value: {} will be use",
+                "the value of key: state.checkpoints.num-retained or 
execution.checkpointing.num-retained in flink-conf.yaml is invalid, must be gt 
0, default value: {} will be use",
                 numRetainedDefaultValue);
           }
         } catch (NumberFormatException e) {
           cpThreshold = numRetainedDefaultValue;
           log.warn(
-              "the value of key: state.checkpoints.num-retained in 
flink-conf.yaml is invalid, must be number, flink env: {}, default value: {} 
will be use",
+              "the value of key: state.checkpoints.num-retained or 
execution.checkpointing.num-retained in flink-conf.yaml is invalid, must be 
number, flink env: {}, default value: {} will be use",
               flinkEnv.getFlinkHome(),
               flinkConfNumRetained);
         }
@@ -232,6 +244,13 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
         
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties())
             .get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
 
+    if (StringUtils.isBlank(savepointPath)) {
+      // for flink 1.20
+      savepointPath =
+          
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties())
+              .get(SAVEPOINT_DIRECTORY_NEW_KEY);
+    }
+
     // Application conf configuration has the second priority. If it is a 
streampark|flinksql type
     // task,
     // see if Application conf is configured when the task is defined, if 
checkpoints are configured
@@ -244,6 +263,10 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
           Map<String, String> map = applicationConfig.readConfig();
           if (FlinkUtils.isCheckpointEnabled(map)) {
             savepointPath = 
map.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
+            if (StringUtils.isBlank(savepointPath)) {
+              // for flink 1.20
+              savepointPath = map.get(SAVEPOINT_DIRECTORY_NEW_KEY);
+            }
           }
         }
       }
@@ -264,6 +287,10 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
         Map<String, String> config = cluster.getFlinkConfig();
         if (!config.isEmpty()) {
           savepointPath = 
config.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
+          if (StringUtils.isBlank(savepointPath)) {
+            // for flink 1.20
+            savepointPath = config.get(SAVEPOINT_DIRECTORY_NEW_KEY);
+          }
         }
       }
     }
@@ -274,6 +301,11 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
       FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
       savepointPath =
           
flinkEnv.getFlinkConfig().getProperty(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
+
+      if (StringUtils.isBlank(savepointPath)) {
+        // for flink 1.20
+        savepointPath = 
flinkEnv.getFlinkConfig().getProperty(SAVEPOINT_DIRECTORY_NEW_KEY);
+      }
     }
 
     return savepointPath;

Reply via email to