huyuanfeng2018 commented on code in PR #2676:
URL: https://github.com/apache/incubator-amoro/pull/2676#discussion_r1555192973


##########
ams/server/src/main/java/com/netease/arctic/server/manager/FlinkOptimizerContainer.java:
##########
@@ -307,27 +360,41 @@ private <T> T fetchCommandOutput(Process exec, 
Function<String, T> commandReader
 
   @Override
   public void releaseOptimizer(Resource resource) {
-    String releaseCommand;
-    switch (target) {
-      case YARN_APPLICATION:
-      case YARN_PER_JOB:
-        releaseCommand = buildReleaseYarnCommand(resource);
-        break;
-      case KUBERNETES_APPLICATION:
-        releaseCommand = buildReleaseKubernetesCommand(resource);
-        break;
-      default:
-        throw new IllegalStateException("Unsupported running target: " + 
target.getValue());
-    }
+    if (target.runByFlinkRestClient()) {
+      Preconditions.checkArgument(
+          resource.getProperties().containsKey(SESSION_CLUSTER_JOB_ID),
+          "Cannot find {} from optimizer start up stats",
+          SESSION_CLUSTER_JOB_ID);
+      String jobId = resource.getProperties().get(SESSION_CLUSTER_JOB_ID);
+      try (RestClusterClient<String> restClusterClient =
+          FlinkClientUtil.getRestClusterClient(loadFlinkConfig())) {
+        Acknowledge ignore = 
restClusterClient.cancel(JobID.fromHexString(jobId)).get();

Review Comment:
   It is not needed here because `Acknowledge` is just a generic 
acknowledgement message. return result in flink client. If an exception occurs, 
Exception will be thrown directly here. However, may need to add a timeout 
detection here.
   
   
   



##########
ams/server/src/main/java/com/netease/arctic/server/manager/FlinkOptimizerContainer.java:
##########
@@ -128,30 +170,41 @@ public void init(String name, Map<String, String> 
containerProperties) {
 
   @Override
   protected Map<String, String> doScaleOut(Resource resource) {
-    String startUpArgs = this.buildOptimizerStartupArgsString(resource);
-    Runtime runtime = Runtime.getRuntime();
-    try {
-      String exportCmd = String.join(" && ", exportSystemProperties());
-      String startUpCmd = String.format("%s && %s", exportCmd, startUpArgs);
-      String[] cmd = {"/bin/sh", "-c", startUpCmd};
-      LOG.info("Starting flink optimizer using command : {}", startUpCmd);
-      Process exec = runtime.exec(cmd);
-      Map<String, String> startUpStatesMap = Maps.newHashMap();
-      switch (target) {
-        case YARN_PER_JOB:
-        case YARN_APPLICATION:
-          String applicationId = fetchCommandOutput(exec, 
yarnApplicationIdReader);
-          if (applicationId != null) {
-            startUpStatesMap.put(YARN_APPLICATION_ID_PROPERTY, applicationId);
-          }
-          break;
-        case KUBERNETES_APPLICATION:
-          startUpStatesMap.put(KUBERNETES_CLUSTER_ID_PROPERTY, 
kubernetesClusterId(resource));
-          break;
+    if (target.runByFlinkRestClient()) {
+      try {
+        JobID jobID = runFlinkOptimizerJob(resource, loadFlinkConfig());

Review Comment:
   thanks for your reminder! This logic is really missing here.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to