zhoujinsong commented on code in PR #2676:
URL: https://github.com/apache/incubator-amoro/pull/2676#discussion_r1549183261


##########
ams/server/src/main/java/com/netease/arctic/server/manager/FlinkOptimizerContainer.java:
##########
@@ -128,30 +170,41 @@ public void init(String name, Map<String, String> 
containerProperties) {
 
   @Override
   protected Map<String, String> doScaleOut(Resource resource) {
-    String startUpArgs = this.buildOptimizerStartupArgsString(resource);
-    Runtime runtime = Runtime.getRuntime();
-    try {
-      String exportCmd = String.join(" && ", exportSystemProperties());
-      String startUpCmd = String.format("%s && %s", exportCmd, startUpArgs);
-      String[] cmd = {"/bin/sh", "-c", startUpCmd};
-      LOG.info("Starting flink optimizer using command : {}", startUpCmd);
-      Process exec = runtime.exec(cmd);
-      Map<String, String> startUpStatesMap = Maps.newHashMap();
-      switch (target) {
-        case YARN_PER_JOB:
-        case YARN_APPLICATION:
-          String applicationId = fetchCommandOutput(exec, 
yarnApplicationIdReader);
-          if (applicationId != null) {
-            startUpStatesMap.put(YARN_APPLICATION_ID_PROPERTY, applicationId);
-          }
-          break;
-        case KUBERNETES_APPLICATION:
-          startUpStatesMap.put(KUBERNETES_CLUSTER_ID_PROPERTY, 
kubernetesClusterId(resource));
-          break;
+    if (target.runByFlinkRestClient()) {
+      try {
+        JobID jobID = runFlinkOptimizerJob(resource, loadFlinkConfig());

Review Comment:
   Should we support reloading session cluster information through the Flink 
optimizer container by using flink-conf. properties in the container and group?



##########
ams/server/src/main/java/com/netease/arctic/server/manager/FlinkOptimizerContainer.java:
##########
@@ -18,28 +18,55 @@
 
 package com.netease.arctic.server.manager;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.netease.arctic.api.OptimizerProperties;
 import com.netease.arctic.api.resource.Resource;
+import com.netease.arctic.server.utils.FlinkClientUtil;
 import org.apache.commons.lang3.StringUtils;
 import 
org.apache.curator.shaded.com.google.common.annotations.VisibleForTesting;

Review Comment:
   Can you help to change the class to the iceberg shaded one?



##########
ams/server/src/main/java/com/netease/arctic/server/manager/FlinkOptimizerContainer.java:
##########
@@ -307,27 +360,41 @@ private <T> T fetchCommandOutput(Process exec, 
Function<String, T> commandReader
 
   @Override
   public void releaseOptimizer(Resource resource) {
-    String releaseCommand;
-    switch (target) {
-      case YARN_APPLICATION:
-      case YARN_PER_JOB:
-        releaseCommand = buildReleaseYarnCommand(resource);
-        break;
-      case KUBERNETES_APPLICATION:
-        releaseCommand = buildReleaseKubernetesCommand(resource);
-        break;
-      default:
-        throw new IllegalStateException("Unsupported running target: " + 
target.getValue());
-    }
+    if (target.runByFlinkRestClient()) {
+      Preconditions.checkArgument(
+          resource.getProperties().containsKey(SESSION_CLUSTER_JOB_ID),
+          "Cannot find {} from optimizer start up stats",
+          SESSION_CLUSTER_JOB_ID);
+      String jobId = resource.getProperties().get(SESSION_CLUSTER_JOB_ID);
+      try (RestClusterClient<String> restClusterClient =
+          FlinkClientUtil.getRestClusterClient(loadFlinkConfig())) {
+        Acknowledge ignore = 
restClusterClient.cancel(JobID.fromHexString(jobId)).get();

Review Comment:
   Do we need to check if the execution of the results was successful?



##########
ams/server/src/main/java/com/netease/arctic/server/manager/FlinkOptimizerContainer.java:
##########
@@ -18,28 +18,55 @@
 
 package com.netease.arctic.server.manager;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;

Review Comment:
   We may need to import the shaded class.



##########
ams/server/src/main/java/com/netease/arctic/server/manager/FlinkOptimizerContainer.java:
##########
@@ -87,16 +117,28 @@ public class FlinkOptimizerContainer extends 
AbstractResourceContainer {
         return null;
       };
 
+  private final Supplier<ExecutorService> clientExecutorServiceSupplier =
+      Suppliers.memoize(
+          () ->
+              Executors.newFixedThreadPool(
+                  5,
+                  new ThreadFactoryBuilder()
+                      .setNameFormat("Flink-RestClusterClient-IO-%d")

Review Comment:
   We may change the thread name format to `flink-rest-cluster-client-io-%d` 
according to:https://github.com/apache/incubator-amoro/issues/2341



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to