This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch k8s-deploy
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit aac96404734de84de88277a04437fea08e28d3e1
Author: benjobs <[email protected]>
AuthorDate: Sun Jan 14 16:39:31 2024 +0800

    [Improve] k8s deploy-cluster | start-app error info improvement
---
 .../core/service/impl/ApplicationServiceImpl.java  |  8 +++--
 .../core/service/impl/FlinkClusterServiceImpl.java | 14 +++++----
 .../flink/client/bean/DeployResponse.scala         |  2 +-
 .../flink/client/bean/ShutDownResponse.scala       |  2 +-
 .../impl/KubernetesNativeSessionClient.scala       | 34 +++++++++-------------
 .../flink/client/impl/YarnSessionClient.scala      | 16 ++--------
 .../flink/client/trait/FlinkClientTrait.scala      |  7 +++++
 7 files changed, 39 insertions(+), 44 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index c27e5772e..5ed782953 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -79,7 +79,7 @@ import 
org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.service.VariableService;
 import org.apache.streampark.console.core.service.YarnQueueService;
 import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
-import org.apache.streampark.flink.client.FlinkClient;
+import org.apache.streampark.flink.client.FlinkClientHandler;
 import org.apache.streampark.flink.client.bean.CancelRequest;
 import org.apache.streampark.flink.client.bean.CancelResponse;
 import org.apache.streampark.flink.client.bean.SubmitRequest;
@@ -1324,7 +1324,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
     final Date triggerTime = new Date();
     CompletableFuture<CancelResponse> cancelFuture =
-        CompletableFuture.supplyAsync(() -> FlinkClient.cancel(cancelRequest), 
executorService);
+        CompletableFuture.supplyAsync(
+            () -> FlinkClientHandler.cancel(cancelRequest), executorService);
 
     cancelFutureMap.put(application.getId(), cancelFuture);
 
@@ -1591,7 +1592,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
     TrackId trackId = isKubernetesApp(application) ? toTrackId(application) : 
null;
     CompletableFuture<SubmitResponse> future =
-        CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest), 
executorService);
+        CompletableFuture.supplyAsync(
+            () -> FlinkClientHandler.submit(submitRequest), executorService);
 
     startFutureMap.put(application.getId(), future);
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index ef1f4a283..84e10e45a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.streampark.console.core.service.impl;
 import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.util.ThreadUtils;
+import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.common.util.YarnUtils;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.ApiDetailException;
@@ -157,7 +158,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
       Future<DeployResponse> future =
           executorService.submit(() -> FlinkClient.deploy(deployRequest));
       DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS);
-      if (deployResponse != null) {
+      if (deployResponse.error() == null) {
         if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
           String address =
               YarnUtils.getRMWebAppURL(true) + "/proxy/" + 
deployResponse.clusterId() + "/";
@@ -171,7 +172,8 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
         updateById(flinkCluster);
       } else {
         throw new ApiAlertException(
-            "deploy cluster failed, unknown reason,please check you params or 
StreamPark error log");
+            "deploy cluster failed, exception:\n"
+                + Utils.stringifyException(deployResponse.error()));
       }
     } catch (Exception e) {
       log.error(e.getMessage(), e);
@@ -279,12 +281,14 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
       Future<ShutDownResponse> future =
           executorService.submit(() -> FlinkClient.shutdown(deployRequest));
       ShutDownResponse shutDownResponse = future.get(60, TimeUnit.SECONDS);
-      if (shutDownResponse != null) {
+      if (shutDownResponse.error() != null) {
+        throw new ApiDetailException(
+            "shutdown cluster failed, error: \n"
+                + Utils.stringifyException(shutDownResponse.error()));
+      } else {
         flinkCluster.setAddress(null);
         flinkCluster.setClusterState(ClusterState.STOPPED.getValue());
         updateById(flinkCluster);
-      } else {
-        throw new ApiAlertException("get shutdown response failed");
       }
     } catch (Exception e) {
       log.error(e.getMessage(), e);
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala
index 91b6ecad0..82c452ece 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala
@@ -17,4 +17,4 @@
 
 package org.apache.streampark.flink.client.bean
 
-case class DeployResponse(address: String, clusterId: String)
+case class DeployResponse(address: String, clusterId: String, error: Throwable 
= null)
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownResponse.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownResponse.scala
index 5c9a14728..de2df366f 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownResponse.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/ShutDownResponse.scala
@@ -17,4 +17,4 @@
 
 package org.apache.streampark.flink.client.bean
 
-case class ShutDownResponse()
+case class ShutDownResponse(error: Throwable = null)
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 7638be720..cc74e31a9 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -169,16 +169,9 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
         client =
           
clusterDescriptor.deploySessionCluster(kubernetesClusterDescriptor._2).getClusterClient
       }
-
-      if (client.getWebInterfaceURL != null) {
-        DeployResponse(client.getWebInterfaceURL, client.getClusterId)
-      } else {
-        null
-      }
+      getDeployResponse(client)
     } catch {
-      case e: Exception =>
-        logError(s"start flink session fail in ${deployRequest.executionMode} 
mode")
-        throw e
+      case e: Exception => DeployResponse(null, null, e)
     } finally {
       Utils.close(client, clusterDescriptor, kubeClient)
     }
@@ -204,18 +197,17 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
 
     val flinkConfig = this.getFlinkK8sConfig(shutDownRequest)
     val clusterDescriptor = getK8sClusterDescriptor(flinkConfig)
-    val client = clusterDescriptor
-      .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
-      .getClusterClient
-    try {
-      client.shutDownCluster()
-      ShutDownResponse()
-    } catch {
-      case e: Exception =>
-        logError(s"shutdown flink session fail in 
${shutDownRequest.executionMode} mode")
-        throw e
-    } finally {
-      Utils.close(client)
+    Try(
+      clusterDescriptor
+        .retrieve(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID))
+        .getClusterClient
+    ) match {
+      case Failure(e) => ShutDownResponse(e)
+      case Success(c) =>
+        Try(c.shutDownCluster()) match {
+          case Success(_) => ShutDownResponse()
+          case Failure(e) => ShutDownResponse(e)
+        }
     }
   }
 
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index edcd6ed75..8fa9922ad 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -189,16 +189,9 @@ object YarnSessionClient extends YarnClientTrait {
       }
       val clientProvider = 
clusterDescriptor.deploySessionCluster(yarnClusterDescriptor._1)
       client = clientProvider.getClusterClient
-      if (client.getWebInterfaceURL != null) {
-        DeployResponse(client.getWebInterfaceURL, client.getClusterId.toString)
-      } else {
-        null
-      }
+      getDeployResponse(client)
     } catch {
-      case e: Exception =>
-        logError(s"start flink session fail in ${deployRequest.executionMode} 
mode")
-        e.printStackTrace()
-        throw e
+      case e: Exception => DeployResponse(null, null, e)
     } finally {
       Utils.close(client, clusterDescriptor)
     }
@@ -234,10 +227,7 @@ object YarnSessionClient extends YarnClientTrait {
           .getFinalApplicationStatus}")
       ShutDownResponse()
     } catch {
-      case e: Exception =>
-        logError(s"shutdown flink session fail in 
${shutDownRequest.executionMode} mode")
-        e.printStackTrace()
-        throw e
+      case e: Exception => ShutDownResponse(e)
     } finally {
       Utils.close(client, clusterDescriptor)
     }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index f1b171310..315f2afcf 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -524,4 +524,11 @@ trait FlinkClientTrait extends Logger {
     clientWrapper.triggerSavepoint(jobID, savepointPath).get()
   }
 
+  def getDeployResponse(client: ClusterClient[_]): DeployResponse = {
+    if (client.getWebInterfaceURL != null) {
+      DeployResponse(client.getWebInterfaceURL, client.getClusterId.toString)
+    } else {
+      DeployResponse(null, null, new RuntimeException("get the cluster 
getWebInterfaceURL failed."))
+    }
+  }
 }

Reply via email to