wolfboys commented on code in PR #2268:
URL: 
https://github.com/apache/incubator-streampark/pull/2268#discussion_r1112021616


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java:
##########
@@ -171,6 +220,184 @@ public SavePoint getLatest(Long id) {
     return this.getOne(queryWrapper);
   }
 
+  @Override
+  public String getSavePointPath(Application appParam) throws Exception {
+    Application application = applicationService.getById(appParam.getId());
+
+    // 1) properties have the highest priority, read the properties are set: 
-Dstate.savepoints.dir
+    String savepointPath =
+        
FlinkClient.extractDynamicPropertiesAsJava(application.getDynamicProperties())
+            .get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
+
+    // Application conf configuration has the second priority. If it is a 
streampark|flinksql type
+    // task,
+    // see if Application conf is configured when the task is defined, if 
checkpoints are configured
+    // and enabled,
+    // read `state.savepoints.dir`
+    if (StringUtils.isBlank(savepointPath)) {
+      if (application.isStreamParkJob() || application.isFlinkSqlJob()) {
+        ApplicationConfig applicationConfig = 
configService.getEffective(application.getId());
+        if (applicationConfig != null) {
+          Map<String, String> map = applicationConfig.readConfig();
+          if (FlinkUtils.isCheckpointEnabled(map)) {
+            savepointPath = 
map.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
+          }
+        }
+      }
+    }
+
+    // 3) If the savepoint is not obtained above, try to obtain the savepoint 
path according to the
+    // deployment type (remote|on yarn)
+    if (StringUtils.isBlank(savepointPath)) {
+      // 3.1) At the remote mode, request the flink webui interface to get the 
savepoint path
+      if (ExecutionMode.isRemoteMode(application.getExecutionMode())) {
+        FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
+        Utils.notNull(
+            cluster,
+            String.format(
+                "The clusterId=%s cannot be find, maybe the clusterId is wrong 
or "
+                    + "the cluster has been deleted. Please contact the 
Admin.",
+                application.getFlinkClusterId()));
+        Map<String, String> config = cluster.getFlinkConfig();
+        if (!config.isEmpty()) {
+          savepointPath = 
config.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
+        }
+      } else {
+        // 3.2) At the yarn or k8s mode, then read the savepoint in 
flink-conf.yml in the bound
+        // flink
+        FlinkEnv flinkEnv = 
flinkEnvService.getById(application.getVersionId());
+        savepointPath =
+            
flinkEnv.convertFlinkYamlAsMap().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
+      }
+    }
+
+    return savepointPath;
+  }
+
+  @Override
+  public void trigger(Long appId, @Nullable String savepointPath) {
+    log.info("Start to trigger savepoint for app {}", appId);
+    Application application = applicationService.getById(appId);
+
+    FlinkRESTAPIWatcher.addSavepoint(application.getId());
+
+    application.setOptionState(OptionState.SAVEPOINTING.getValue());
+    application.setOptionTime(new Date());
+    this.applicationService.updateById(application);
+    flinkRESTAPIWatcher.init();
+
+    FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
+
+    // infer savepoint
+    String customSavepoint = this.getFinalSavepointDir(savepointPath, 
application);
+
+    FlinkCluster cluster = 
flinkClusterService.getById(application.getFlinkClusterId());
+    String clusterId = getClusterId(application, cluster);
+
+    Map<String, Object> properties = this.tryGetRestProps(application, 
cluster);
+
+    TriggerSavepointRequest request =
+        new TriggerSavepointRequest(
+            flinkEnv.getFlinkVersion(),
+            ExecutionMode.of(application.getExecutionMode()),

Review Comment:
   code can be improved:
   `application.getExecutionModeEnum()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to