This is an automated email from the ASF dual-hosted git repository.
linying pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 8ff30e3de Refactor the lifecycle control of flink application mode
-dosubmit (#2994)
8ff30e3de is described below
commit 8ff30e3de6c233adc5601839aaa73c4d39bc78cd
Author: caicancai <[email protected]>
AuthorDate: Thu Sep 7 18:39:24 2023 +0800
Refactor the lifecycle control of flink application mode -dosubmit (#2994)
* remove scala2.11
* [Feature] [Flink-K8s-V2] Refactor the lifecycle control of Flink
application-mode_dosubmit
* add v2 denpendcy
* mvn spotless:apply
* throw exception
* optimization
* Code optimization
* add comment
* add comment
* add zio denpency
* resolve dependency conflicts
* add KubernetesNativeApplicationClient_V2
* Parameter optimization and add license
* Parameter optimization and add license
* add flinkconfig param
* Rename to KubernetesNativeApplicationV2
---
.../streampark-flink-client-core/pom.xml | 27 +++++
.../impl/KubernetesNativeApplicationClientV2.scala | 121 +++++++++++++++++++++
2 files changed, 148 insertions(+)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
index 9e942fc1f..506d428df 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
@@ -51,6 +51,18 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+
<artifactId>streampark-flink-kubernetes-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn${scala.binary.flink.version}</artifactId>
@@ -94,6 +106,21 @@
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
</dependency>
+ <!-- ZIO -->
+ <dependency>
+ <groupId>dev.zio</groupId>
+ <artifactId>zio-logging_${scala.binary.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>dev.zio</groupId>
+ <artifactId>zio-streams_${scala.binary.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>dev.zio</groupId>
+ <artifactId>zio-concurrent_${scala.binary.version}</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClientV2.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClientV2.scala
new file mode 100644
index 000000000..f43e88f0b
--- /dev/null
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClientV2.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.client.impl
+
+import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode,
ExecutionMode}
+import org.apache.streampark.common.zio.ZIOExt.IOOps
+import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait
+import org.apache.streampark.flink.client.bean._
+import org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef,
JobDef, JobManagerDef, TaskManagerDef}
+import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
+import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.configuration.{Configuration, DeploymentOptions,
ExecutionOptions, JobManagerOptions, PipelineOptions, TaskManagerOptions}
+import org.apache.flink.v1beta1.FlinkDeploymentSpec.FlinkVersion
+
+import scala.collection.JavaConverters._
+import scala.language.postfixOps
+
+object KubernetesNativeApplicationClientV2 extends KubernetesNativeClientTrait
{
+ @throws[Exception]
+ override def doSubmit(
+ submitRequest: SubmitRequest,
+ flinkConfig: Configuration): SubmitResponse = {
+
+ // require parameters
+ require(
+ StringUtils.isNotBlank(submitRequest.k8sSubmitParam.clusterId),
+ s"[flink-submit] submit flink job failed, clusterId is null,
mode=${flinkConfig.get(DeploymentOptions.TARGET)}"
+ )
+
+ // check the last building result
+ submitRequest.checkBuildResult()
+
+ try {
+ val spec: FlinkDeploymentDef = convertFlinkDeploymentDef(submitRequest,
flinkConfig)
+ FlinkK8sOperator.deployApplicationJob(submitRequest.id, spec).runIO
+ val result = SubmitResponse(null, flinkConfig.toMap,
submitRequest.jobId, null)
+ logInfo(
+ s"[flink-submit] flink job has been submitted.
${flinkConfIdentifierInfo(flinkConfig)}")
+ result
+ } catch {
+ case e: Exception =>
+ logError(s"submit flink job fail in ${submitRequest.executionMode}
mode")
+ throw e
+ } finally {}
+ }
+
+ override def doCancel(
+ cancelRequest: CancelRequest,
+ flinkConfig: Configuration): CancelResponse = {
+ flinkConfig.safeSet(
+ DeploymentOptions.TARGET,
+ ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
+ super.doCancel(cancelRequest, flinkConfig)
+ }
+
+ override def doTriggerSavepoint(
+ request: TriggerSavepointRequest,
+ flinkConf: Configuration): SavepointResponse = {
+ flinkConf.safeSet(DeploymentOptions.TARGET,
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
+ super.doTriggerSavepoint(request, flinkConf)
+ }
+
+ private[this] def convertFlinkDeploymentDef(
+ submitRequest: SubmitRequest,
+ flinkConfig: Configuration): FlinkDeploymentDef = {
+ val spec = FlinkDeploymentDef(
+ name = submitRequest.appName,
+ namespace = submitRequest.k8sSubmitParam.kubernetesNamespace,
+ image =
submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse].flinkImageTag,
+ flinkVersion = Option(submitRequest.flinkVersion.majorVersion)
+ .map(_.replace(".", "_"))
+ .map("V" + _)
+ .flatMap(v => FlinkVersion.values().find(_.name() == v)) match {
+ case Some(version) => version
+ case None => throw new IllegalArgumentException("Flink version not
found")
+ },
+ jobManager = JobManagerDef(
+ cpu = 1,
+ memory =
flinkConfig.get(JobManagerOptions.TOTAL_PROCESS_MEMORY).toString),
+ taskManager = TaskManagerDef(
+ cpu = 1,
+ memory =
flinkConfig.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY).toString),
+ job = Option(
+ JobDef(
+ jarURI =
+
submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse].dockerInnerMainJarPath,
+ parallelism = 1,
+ args =
Array(flinkConfig.toMap.get("$internal.application.program-args")),
+ entryClass = Some(submitRequest.appMain),
+ initialSavepointPath = Some(submitRequest.savePoint),
+ allowNonRestoredState = Some(submitRequest.allowNonRestoredState)
+ )),
+ extJarPaths = submitRequest.userJarFile match {
+ case null => Array.empty[String]
+ case file => Array(file.getAbsolutePath)
+ },
+ flinkConfiguration = submitRequest.extraParameter match {
+ case null => Map.empty
+ case e => e.asScala.map { case (key, value) => key -> value.toString
}.toMap
+ }
+ )
+ spec
+ }
+}