This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
     new 5ff209809 [Improve] cp/sp path improvement (#4027)
5ff209809 is described below

commit 5ff20980985223dcafc0b0ff778d587fa27c24aa
Author: benjobs <[email protected]>
AuthorDate: Tue Sep 3 18:42:54 2024 +0800

    [Improve] cp/sp path improvement (#4027)
---
 .../console/core/service/SavepointService.java     |  2 +
 .../core/service/impl/ApplicationServiceImpl.java  | 25 +++++----
 .../core/service/impl/SavepointServiceImpl.java    | 62 ++++++++++++----------
 3 files changed, 52 insertions(+), 37 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
index 60cedc978..2c31d77a3 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SavepointService.java
@@ -42,4 +42,6 @@ public interface SavepointService extends IService<Savepoint> 
{
   void removeApp(Application application);
 
   String getSavePointPath(Application app) throws Exception;
+
+  String processPath(String path, String jobName, Long jobId);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 51246f5ee..ce5c86ed1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1336,7 +1336,11 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     String customSavepoint = null;
     if (appParam.getRestoreOrTriggerSavepoint()) {
       customSavepoint = appParam.getSavepointPath();
-      if (StringUtils.isBlank(customSavepoint)) {
+      if (customSavepoint == null) {
+        customSavepoint =
+            savepointService.processPath(
+                customSavepoint, application.getJobName(), 
application.getId());
+      } else {
         customSavepoint = savepointService.getSavePointPath(appParam);
       }
     }
@@ -1451,23 +1455,23 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       final String scheme = uri.getScheme();
       final String pathPart = uri.getPath();
       if (scheme == null) {
-        return "This state.savepoints.dir value "
+        return "This state savepoint dir value "
             + savepointPath
             + " scheme (hdfs://, file://, etc) of  is null. Please specify the 
file system scheme explicitly in the URI.";
       }
       if (pathPart == null) {
-        return "This state.savepoints.dir value "
+        return "This state savepoint dir value "
             + savepointPath
             + " path part to store the checkpoint data in is null. Please 
specify a directory path for the checkpoint data.";
       }
       if (pathPart.isEmpty() || "/".equals(pathPart)) {
-        return "This state.savepoints.dir value "
+        return "This state savepoint dir value "
             + savepointPath
             + " Cannot use the root directory for checkpoints.";
       }
       return null;
     } else {
-      return "When custom savepoint is not set, state.savepoints.dir needs to 
be set in properties or flink-conf.yaml of application";
+      return "When a custom savepoint is not set, state.savepoints.dir or 
execution.checkpointing.savepoint-dir needs to be configured in the properties 
or flink-conf.yaml of the application.";
     }
   }
 
@@ -1660,7 +1664,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             application.getJobName(),
             appConf,
             application.getApplicationType(),
-            getSavepointPath(appParam),
+            getSavepointPath(appParam, application.getJobName(), 
application.getId()),
             applicationArgs,
             buildResult,
             extraParameter,
@@ -1877,19 +1881,20 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     return false;
   }
 
-  private String getSavepointPath(Application appParam) {
+  private String getSavepointPath(Application appParam, String jobName, Long 
jobId) {
+    String path = null;
     if (appParam.getRestoreOrTriggerSavepoint() != null
         && appParam.getRestoreOrTriggerSavepoint()) {
       if (StringUtils.isBlank(appParam.getSavepointPath())) {
         Savepoint savepoint = savepointService.getLatest(appParam.getId());
         if (savepoint != null) {
-          return savepoint.getPath();
+          path = savepoint.getPath();
         }
       } else {
-        return appParam.getSavepointPath();
+        path = appParam.getSavepointPath();
       }
     }
-    return null;
+    return savepointService.processPath(path, jobName, jobId);
   }
 
   /**
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
index ead9934a8..61b025b01 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
@@ -70,6 +70,7 @@ import java.net.URI;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -95,7 +96,7 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
 
   @Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
 
-  private static final String SAVEPOINT_DIRECTORY_NEW_KEY = 
"execution.checkpointing.dir";
+  private static final String SAVEPOINT_DIRECTORY_NEW_KEY = 
"execution.checkpointing.savepoint-dir";
 
   private static final String MAX_RETAINED_CHECKPOINTS_NEW_KEY =
       "execution.checkpointing.num-retained";
@@ -244,16 +245,13 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
     Application application = applicationService.getById(appParam.getId());
 
     // 1) properties have the highest priority, read the properties are set: 
-Dstate.savepoints.dir
-    String savepointPath =
-        
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties())
-            .get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
+    Map<String, String> properties =
+        
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties());
 
-    if (StringUtils.isBlank(savepointPath)) {
-      // for flink 1.20
-      savepointPath =
-          
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties())
-              .get(SAVEPOINT_DIRECTORY_NEW_KEY);
-    }
+    String savepointPath =
+        properties.getOrDefault(
+            CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
+            properties.get(SAVEPOINT_DIRECTORY_NEW_KEY));
 
     // Application conf configuration has the second priority. If it is a 
streampark|flinksql type
     // task,
@@ -266,11 +264,10 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
         if (applicationConfig != null) {
           Map<String, String> map = applicationConfig.readConfig();
           if (FlinkUtils.isCheckpointEnabled(map)) {
-            savepointPath = 
map.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
-            if (StringUtils.isBlank(savepointPath)) {
-              // for flink 1.20
-              savepointPath = map.get(SAVEPOINT_DIRECTORY_NEW_KEY);
-            }
+            savepointPath =
+                map.getOrDefault(
+                    CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
+                    map.get(SAVEPOINT_DIRECTORY_NEW_KEY));
           }
         }
       }
@@ -290,11 +287,10 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
                 application.getFlinkClusterId()));
         Map<String, String> config = cluster.getFlinkConfig();
         if (!config.isEmpty()) {
-          savepointPath = 
config.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
-          if (StringUtils.isBlank(savepointPath)) {
-            // for flink 1.20
-            savepointPath = config.get(SAVEPOINT_DIRECTORY_NEW_KEY);
-          }
+          savepointPath =
+              config.getOrDefault(
+                  CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
+                  config.get(SAVEPOINT_DIRECTORY_NEW_KEY));
         }
       }
     }
@@ -303,16 +299,25 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
     if (StringUtils.isBlank(savepointPath)) {
       // flink
       FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
+      Properties flinkConfig = flinkEnv.getFlinkConfig();
       savepointPath =
-          
flinkEnv.getFlinkConfig().getProperty(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
-
-      if (StringUtils.isBlank(savepointPath)) {
-        // for flink 1.20
-        savepointPath = 
flinkEnv.getFlinkConfig().getProperty(SAVEPOINT_DIRECTORY_NEW_KEY);
-      }
+          flinkConfig.getProperty(
+              CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
+              flinkConfig.getProperty(SAVEPOINT_DIRECTORY_NEW_KEY));
     }
 
-    return savepointPath;
+    return processPath(savepointPath, application.getJobName(), 
application.getId());
+  }
+
+  @Override
+  public String processPath(String path, String jobName, Long jobId) {
+    if (StringUtils.isNotBlank(path)) {
+      return path.replaceAll("\\$job(Id|id)", jobId.toString())
+          .replaceAll("\\$\\{job(Id|id)}", jobId.toString())
+          .replaceAll("\\$job(Name|name)", jobName)
+          .replaceAll("\\$\\{job(Name|name)}", jobName);
+    }
+    return path;
   }
 
   @Override
@@ -340,6 +345,9 @@ public class SavepointServiceImpl extends 
ServiceImpl<SavepointMapper, Savepoint
 
     // infer savepoint
     String customSavepoint = this.getFinalSavepointDir(savepointPath, 
application);
+    if (StringUtils.isNotBlank(customSavepoint)) {
+      customSavepoint = processPath(customSavepoint, application.getJobName(), 
application.getId());
+    }
 
     FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
     String clusterId = getClusterId(application, cluster);

Reply via email to