This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch k8s-deploy
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/k8s-deploy by this push:
new 8ad6dedc8 minor improvement
8ad6dedc8 is described below
commit 8ad6dedc831069554badf6007a70cd4de0ca74f8
Author: benjobs <[email protected]>
AuthorDate: Sun Jan 14 19:05:22 2024 +0800
minor improvement
---
.../console/core/service/impl/FlinkClusterServiceImpl.java | 12 +++++++-----
.../apache/streampark/flink/client/bean/CancelRequest.scala | 12 ++++++------
.../apache/streampark/flink/client/bean/DeployResponse.scala | 2 +-
.../flink/client/bean/TriggerSavepointRequest.scala | 12 ++++++------
.../flink/client/impl/KubernetesNativeSessionClient.scala | 2 +-
.../streampark/flink/client/impl/YarnSessionClient.scala | 4 ++--
.../streampark/flink/client/trait/FlinkClientTrait.scala | 4 ++--
7 files changed, 25 insertions(+), 23 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index f1e68bdbc..a207d28ff 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -165,7 +165,13 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
Future<DeployResponse> future =
executorService.submit(() -> FlinkClient.deploy(deployRequest));
DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS);
- if (deployResponse.error() == null) {
+ if (deployResponse.error() != null) {
+ throw new ApiDetailException(
+ "deploy cluster "
+ + flinkCluster.getClusterName()
+ + "failed, exception:\n"
+ + Utils.stringifyException(deployResponse.error()));
+ } else {
if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
String address =
YarnUtils.getRMWebAppURL(true) + "/proxy/" +
deployResponse.clusterId() + "/";
@@ -177,10 +183,6 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
flinkCluster.setClusterState(ClusterState.STARTED.getValue());
flinkCluster.setException(null);
updateById(flinkCluster);
- } else {
- throw new ApiAlertException(
- "deploy cluster failed, exception:\n"
- + Utils.stringifyException(deployResponse.error()));
}
} catch (Exception e) {
log.error(e.getMessage(), e);
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
index f11f561c9..7587f750e 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
@@ -25,13 +25,13 @@ import javax.annotation.Nullable
import java.util.{Map => JavaMap}
case class CancelRequest(
- flinkVersion: FlinkVersion,
- executionMode: ExecutionMode,
- @Nullable properties: JavaMap[String, Any],
- clusterId: String,
- jobId: String,
+ override val flinkVersion: FlinkVersion,
+ override val executionMode: ExecutionMode,
+ @Nullable override val properties: JavaMap[String, Any],
+ override val clusterId: String,
+ override val jobId: String,
override val withSavepoint: Boolean,
withDrain: Boolean,
- savepointPath: String,
+ override val savepointPath: String,
override val kubernetesNamespace: String =
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE)
extends SavepointRequestTrait
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala
index 82c452ece..6e1edbaff 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployResponse.scala
@@ -17,4 +17,4 @@
package org.apache.streampark.flink.client.bean
-case class DeployResponse(address: String, clusterId: String, error: Throwable
= null)
+case class DeployResponse(address: String = null, clusterId: String = null,
error: Throwable = null)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
index 8f6344834..7bb87b06e 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
@@ -26,11 +26,11 @@ import java.util.{Map => JavaMap}
/** Trigger savepoint request. */
case class TriggerSavepointRequest(
- flinkVersion: FlinkVersion,
- executionMode: ExecutionMode,
- @Nullable properties: JavaMap[String, Any],
- clusterId: String,
- jobId: String,
- savepointPath: String,
+ override val flinkVersion: FlinkVersion,
+ override val executionMode: ExecutionMode,
+ @Nullable override val properties: JavaMap[String, Any],
+ override val clusterId: String,
+ override val jobId: String,
+ override val savepointPath: String,
override val kubernetesNamespace: String =
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE)
extends SavepointRequestTrait
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index cc74e31a9..3606c03a0 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -171,7 +171,7 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
}
getDeployResponse(client)
} catch {
- case e: Exception => DeployResponse(null, null, e)
+ case e: Exception => DeployResponse(error = e)
} finally {
Utils.close(client, clusterDescriptor, kubeClient)
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index 328806c68..0e0b4b3c9 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -193,14 +193,14 @@ object YarnSessionClient extends YarnClientTrait {
}
}
} catch {
- case e: ApplicationNotFoundException => return DeployResponse(null,
null, e)
+ case e: Exception => return DeployResponse(error = e)
}
}
val clientProvider =
clusterDescriptor.deploySessionCluster(yarnClusterDescriptor._1)
client = clientProvider.getClusterClient
getDeployResponse(client)
} catch {
- case e: Exception => DeployResponse(null, null, e)
+ case e: Exception => DeployResponse(error = e)
} finally {
Utils.close(client, clusterDescriptor)
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 315f2afcf..09b4ed8ad 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -526,9 +526,9 @@ trait FlinkClientTrait extends Logger {
def getDeployResponse(client: ClusterClient[_]): DeployResponse = {
if (client.getWebInterfaceURL != null) {
- DeployResponse(client.getWebInterfaceURL, client.getClusterId.toString)
+ DeployResponse(address = client.getWebInterfaceURL, clusterId =
client.getClusterId.toString)
} else {
- DeployResponse(null, null, new RuntimeException("get the cluster
getWebInterfaceURL failed."))
+ DeployResponse(error = new RuntimeException("get the cluster
getWebInterfaceURL failed."))
}
}
}