This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new babb49482 [Improve] Improve streampark-flink-client-core module base
on [Naming Style] (#3228)
babb49482 is described below
commit babb4948287774de4b891716e3c7d35eedf6db26
Author: caicancai <[email protected]>
AuthorDate: Tue Oct 10 12:00:50 2023 +0800
[Improve] Improve streampark-flink-client-core module base on [Naming
Style] (#3228)
---
.../flink/client/impl/KubernetesSessionClientV2.scala | 17 ++++++++---------
1 file changed, 8 insertions(+), 9 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
index 3cbfbdd2e..c40fba899 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
@@ -176,15 +176,15 @@ object KubernetesSessionClientV2 extends
KubernetesClientV2Trait with Logger {
}
private def genFlinkDeployDef(
- deployRequest: DeployRequest,
+ deployReq: DeployRequest,
originFlinkConfig: Configuration): Either[FailureMessage,
FlinkDeploymentDef] = {
val flinkConfObj = originFlinkConfig.clone()
val flinkConfMap = originFlinkConfig.toMap.asScala.toMap
- val namespace = Option(deployRequest.k8sDeployParam.kubernetesNamespace)
+ val namespace = Option(deployReq.k8sDeployParam.kubernetesNamespace)
.getOrElse("default")
- val name = Option(deployRequest.k8sDeployParam.clusterId)
+ val name = Option(deployReq.k8sDeployParam.clusterId)
.filter(str => StringUtils.isNotBlank(str))
.getOrElse(return Left("Kubernetes CR name should not be empty"))
@@ -192,18 +192,17 @@ object KubernetesSessionClientV2 extends
KubernetesClientV2Trait with Logger {
.getOption(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY)
.map(_.toString)
- val image = Option(deployRequest.k8sDeployParam.flinkImage)
+ val image = Option(deployReq.k8sDeployParam.flinkImage)
.filter(str => StringUtils.isNotBlank(str))
.getOrElse(return Left("Flink base image should not be empty"))
- val serviceAccount = Option(deployRequest.k8sDeployParam.serviceAccount)
+ val serviceAccount = Option(deployReq.k8sDeployParam.serviceAccount)
.getOrElse(FlinkDeploymentDef.DEFAULT_SERVICE_ACCOUNT)
- val flinkVersion = Option(deployRequest.flinkVersion.majorVersion)
+ val flinkVersion = Option(deployReq.flinkVersion.majorVersion)
.map(majorVer => "V" + majorVer.replace(".", "_"))
.flatMap(v => FlinkVersion.values().find(_.name() == v))
- .getOrElse(
- return Left(s"Unsupported Flink
version:${deployRequest.flinkVersion.majorVersion}"))
+ .getOrElse(return Left(s"Unsupported Flink
version:${deployReq.flinkVersion.majorVersion}"))
val jobManager = {
val cpu = flinkConfMap
@@ -255,7 +254,7 @@ object KubernetesSessionClientV2 extends
KubernetesClientV2Trait with Logger {
.removeKey(KUBERNETES_TM_CPU_KEY)
.removeKey(KUBERNETES_JM_CPU_AMOUNT_KEY)
.removeKey(KUBERNETES_JM_CPU_KEY)
- Option(deployRequest.k8sDeployParam.flinkRestExposedType).foreach {
+ Option(deployReq.k8sDeployParam.flinkRestExposedType).foreach {
exposedType => result += KUBERNETES_REST_SERVICE_EXPORTED_TYPE_KEY ->
exposedType.getName
}
result.toMap