This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.4 by this push:
     new a64224407  [Improve] fixed session cluster start/shutdown wait time 
config. (#3626)
a64224407 is described below

commit a64224407b2b4370cfc401c4f1a86309c773c5f2
Author: ouyangwulin <[email protected]>
AuthorDate: Sun Mar 24 00:15:45 2024 +0800

     [Improve] fixed session cluster start/shutdown wait time config. (#3626)
    
    * [Improve] add yarn api error log for debug some problems
    
    * [Improve] fixed session cluster start/shutdown wait time config.
    
    ---------
    
    Co-authored-by: [email protected] <[email protected]>
---
 .../core/service/impl/FlinkClusterServiceImpl.java | 96 +++++++++++-----------
 1 file changed, 50 insertions(+), 46 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 1240313d9..5bbaa768d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -57,8 +57,8 @@ import 
org.springframework.transaction.annotation.Transactional;
 import java.util.Collection;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -172,39 +172,41 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
       DeployRequest deployRequest = getDeployRequest(flinkCluster);
 
       log.info("deploy cluster request: {}", deployRequest);
-      Future<DeployResponse> future =
-          bootstrapExecutor.submit(() -> FlinkClient.deploy(deployRequest));
-      DeployResponse deployResponse = future.get(5, TimeUnit.SECONDS);
-      if (deployResponse.error() != null) {
-        throw new ApiDetailException(
-            "deploy cluster "
-                + flinkCluster.getClusterName()
-                + "failed, exception:\n"
-                + Utils.stringifyException(deployResponse.error()));
-      } else {
-        // 2) setAddress
-        if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
-          String address =
-              YarnUtils.getRMWebAppURL(true) + "/proxy/" + 
deployResponse.clusterId() + "/";
-          flinkCluster.setAddress(address);
-        } else {
-          flinkCluster.setAddress(deployResponse.address());
-        }
-        flinkCluster.setClusterId(deployResponse.clusterId());
-        flinkCluster.setClusterState(ClusterState.STARTED.getValue());
-        flinkCluster.setException(null);
-        updateById(flinkCluster);
-
-        // k8s session mode ingress
-        if (ExecutionMode.KUBERNETES_NATIVE_SESSION.equals(executionModeEnum)) 
{
-          try {
-            serviceHelper.configureIngress(
-                flinkCluster.getClusterId(), flinkCluster.getK8sNamespace());
-          } catch (KubernetesClientException e) {
-            log.info("Failed to create ingress: {}", e.getMessage());
-          }
-        }
-      }
+      CompletableFuture<DeployResponse> future =
+          CompletableFuture.supplyAsync(() -> 
FlinkClient.deploy(deployRequest));
+      future.whenComplete(
+          ((deployResponse, throwable) -> {
+            if (throwable != null) {
+              throw new ApiDetailException(
+                  "deploy cluster "
+                      + flinkCluster.getClusterName()
+                      + "failed, exception:\n"
+                      + Utils.stringifyException(deployResponse.error()));
+            } else {
+              // 2) setAddress
+              if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
+                String address =
+                    YarnUtils.getRMWebAppURL(true) + "/proxy/" + 
deployResponse.clusterId() + "/";
+                flinkCluster.setAddress(address);
+              } else {
+                flinkCluster.setAddress(deployResponse.address());
+              }
+              flinkCluster.setClusterId(deployResponse.clusterId());
+              flinkCluster.setClusterState(ClusterState.STARTED.getValue());
+              flinkCluster.setException(null);
+              updateById(flinkCluster);
+
+              // k8s session mode ingress
+              if 
(ExecutionMode.KUBERNETES_NATIVE_SESSION.equals(executionModeEnum)) {
+                try {
+                  serviceHelper.configureIngress(
+                      flinkCluster.getClusterId(), 
flinkCluster.getK8sNamespace());
+                } catch (KubernetesClientException e) {
+                  log.info("Failed to create ingress: {}", e.getMessage());
+                }
+              }
+            }
+          }));
     } catch (Exception e) {
       log.error(e.getMessage(), e);
       flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
@@ -311,18 +313,20 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     // 4) shutdown
     DeployRequest deployRequest = getDeployRequest(flinkCluster);
     try {
-      Future<ShutDownResponse> future =
-          bootstrapExecutor.submit(() -> FlinkClient.shutdown(deployRequest));
-      ShutDownResponse shutDownResponse = future.get(60, TimeUnit.SECONDS);
-      if (shutDownResponse.error() != null) {
-        throw new ApiDetailException(
-            "shutdown cluster failed, error: \n"
-                + Utils.stringifyException(shutDownResponse.error()));
-      } else {
-        flinkCluster.setAddress(null);
-        flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
-        updateById(flinkCluster);
-      }
+      CompletableFuture<ShutDownResponse> future =
+          CompletableFuture.supplyAsync(() -> 
FlinkClient.shutdown(deployRequest));
+      future.whenComplete(
+          ((shutDownResponse, throwable) -> {
+            if (shutDownResponse.error() != null) {
+              throw new ApiDetailException(
+                  "shutdown cluster failed, error: \n"
+                      + Utils.stringifyException(shutDownResponse.error()));
+            } else {
+              flinkCluster.setAddress(null);
+              flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
+              updateById(flinkCluster);
+            }
+          }));
     } catch (Exception e) {
       log.error(e.getMessage(), e);
       flinkCluster.setException(e.toString());

Reply via email to