This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch k8s-deploy
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/k8s-deploy by this push:
     new 8ad6dedc8 minor improvement
8ad6dedc8 is described below

commit 8ad6dedc831069554badf6007a70cd4de0ca74f8
Author: benjobs <[email protected]>
AuthorDate: Sun Jan 14 19:05:22 2024 +0800

    minor improvement
---
 .../console/core/service/impl/FlinkClusterServiceImpl.java   | 12 +++++++-----
 .../apache/streampark/flink/client/bean/CancelRequest.scala  | 12 ++++++------
 .../apache/streampark/flink/client/bean/DeployResponse.scala |  2 +-
 .../flink/client/bean/TriggerSavepointRequest.scala          | 12 ++++++------
 .../flink/client/impl/KubernetesNativeSessionClient.scala    |  2 +-
 .../streampark/flink/client/impl/YarnSessionClient.scala     |  4 ++--
 .../streampark/flink/client/trait/FlinkClientTrait.scala     |  4 ++--
 7 files changed, 25 insertions(+), 23 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index f1e68bdbc..a207d28ff 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -165,7 +165,13 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
       Future<DeployResponse> future =
           executorService.submit(() -> FlinkClient.deploy(deployRequest));
       DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS);
-      if (deployResponse.error() == null) {
+      if (deployResponse.error() != null) {
+        throw new ApiDetailException(
+            "deploy cluster "
+                + flinkCluster.getClusterName()
+                + "failed, exception:\n"
+                + Utils.stringifyException(deployResponse.error()));
+      } else {
         if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
           String address =
               YarnUtils.getRMWebAppURL(true) + "/proxy/" + 
deployResponse.clusterId() + "/";
@@ -177,10 +183,6 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
         flinkCluster.setClusterState(ClusterState.STARTED.getValue());
         flinkCluster.setException(null);
         updateById(flinkCluster);
-      } else {
-        throw new ApiAlertException(
-            "deploy cluster failed, exception:\n"
-                + Utils.stringifyException(deployResponse.error()));
       }
     } catch (Exception e) {
       log.error(e.getMessage(), e);
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
index f11f561c9..7587f750e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
@@ -25,13 +25,13 @@ import javax.annotation.Nullable
 import java.util.{Map => JavaMap}
 
 case class CancelRequest(
-    flinkVersion: FlinkVersion,
-    executionMode: ExecutionMode,
-    @Nullable properties: JavaMap[String, Any],
-    clusterId: String,
-    jobId: String,
+    override val flinkVersion: FlinkVersion,
+    override val executionMode: ExecutionMode,
+    @Nullable override val properties: JavaMap[String, Any],
+    override val clusterId: String,
+    override val jobId: String,
     override val withSavepoint: Boolean,
     withDrain: Boolean,
-    savepointPath: String,
+    override val savepointPath: String,
     override val kubernetesNamespace: String = 
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE)
   extends SavepointRequestTrait
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala
index 82c452ece..6e1edbaff 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala
@@ -17,4 +17,4 @@
 
 package org.apache.streampark.flink.client.bean
 
-case class DeployResponse(address: String, clusterId: String, error: Throwable 
= null)
+case class DeployResponse(address: String = null, clusterId: String = null, 
error: Throwable = null)
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
index 8f6344834..7bb87b06e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
@@ -26,11 +26,11 @@ import java.util.{Map => JavaMap}
 
 /** Trigger savepoint request. */
 case class TriggerSavepointRequest(
-    flinkVersion: FlinkVersion,
-    executionMode: ExecutionMode,
-    @Nullable properties: JavaMap[String, Any],
-    clusterId: String,
-    jobId: String,
-    savepointPath: String,
+    override val flinkVersion: FlinkVersion,
+    override val executionMode: ExecutionMode,
+    @Nullable override val properties: JavaMap[String, Any],
+    override val clusterId: String,
+    override val jobId: String,
+    override val savepointPath: String,
     override val kubernetesNamespace: String = 
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE)
   extends SavepointRequestTrait
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index cc74e31a9..3606c03a0 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -171,7 +171,7 @@ object KubernetesNativeSessionClient extends 
KubernetesNativeClientTrait with Lo
       }
       getDeployResponse(client)
     } catch {
-      case e: Exception => DeployResponse(null, null, e)
+      case e: Exception => DeployResponse(error = e)
     } finally {
       Utils.close(client, clusterDescriptor, kubeClient)
     }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index 328806c68..0e0b4b3c9 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -193,14 +193,14 @@ object YarnSessionClient extends YarnClientTrait {
             }
           }
         } catch {
-          case e: ApplicationNotFoundException => return DeployResponse(null, 
null, e)
+          case e: Exception => return DeployResponse(error = e)
         }
       }
       val clientProvider = 
clusterDescriptor.deploySessionCluster(yarnClusterDescriptor._1)
       client = clientProvider.getClusterClient
       getDeployResponse(client)
     } catch {
-      case e: Exception => DeployResponse(null, null, e)
+      case e: Exception => DeployResponse(error = e)
     } finally {
       Utils.close(client, clusterDescriptor)
     }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 315f2afcf..09b4ed8ad 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -526,9 +526,9 @@ trait FlinkClientTrait extends Logger {
 
   def getDeployResponse(client: ClusterClient[_]): DeployResponse = {
     if (client.getWebInterfaceURL != null) {
-      DeployResponse(client.getWebInterfaceURL, client.getClusterId.toString)
+      DeployResponse(address = client.getWebInterfaceURL, clusterId = 
client.getClusterId.toString)
     } else {
-      DeployResponse(null, null, new RuntimeException("get the cluster 
getWebInterfaceURL failed."))
+      DeployResponse(error = new RuntimeException("get the cluster 
getWebInterfaceURL failed."))
     }
   }
 }

Reply via email to