This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 232d93f1f [Feature] [Flink-K8s-V2] Refactor the lifecycle control of
Flink session-mode jobs on Kubernetes (#3041)
232d93f1f is described below
commit 232d93f1fb65a2e1ae2bc1148a3e15c74845c5b4
Author: caicancai <[email protected]>
AuthorDate: Tue Sep 12 12:46:53 2023 +0800
[Feature] [Flink-K8s-V2] Refactor the lifecycle control of Flink
session-mode jobs on Kubernetes (#3041)
* [Feature] [Flink-K8s-V2] Refactor the lifecycle control of
Flinksession-mode jobs on Kubernetes
* add Apache license header
---
.../flink/client/FlinkClientHandler.scala | 5 +-
.../client/impl/KubernetesSessionClientV2.scala | 157 +++++++++++++++++++++
2 files changed, 161 insertions(+), 1 deletion(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
index 810df34e3..3da974d7b 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
@@ -32,7 +32,10 @@ object FlinkClientHandler {
YARN_APPLICATION -> YarnApplicationClient,
YARN_SESSION -> YarnSessionClient,
YARN_PER_JOB -> YarnPerJobClient,
- KUBERNETES_NATIVE_SESSION -> KubernetesNativeSessionClient,
+ KUBERNETES_NATIVE_SESSION -> {
+ if (K8sFlinkConfig.isV2Enabled) KubernetesSessionClientV2
+ else KubernetesNativeSessionClient
+ },
KUBERNETES_NATIVE_APPLICATION -> {
if (K8sFlinkConfig.isV2Enabled) KubernetesApplicationClientV2
else KubernetesNativeApplicationClient
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
new file mode 100644
index 000000000..d5a9f51db
--- /dev/null
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.client.impl
+
+import org.apache.streampark.common.util.Logger
+import org.apache.streampark.common.zio.ZIOExt.IOOps
+import org.apache.streampark.flink.client.`trait`.KubernetesClientV2Trait
+import org.apache.streampark.flink.client.bean._
+import org.apache.streampark.flink.kubernetes.v2.model.{FlinkSessionJobDef,
JobManagerDef, TaskManagerDef}
+import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
+import org.apache.streampark.flink.packer.pipeline.K8sAppModeBuildResponse
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.client.deployment.application.ApplicationConfiguration
+import org.apache.flink.configuration._
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters.mapAsScalaMapConverter
+import scala.util.{Failure, Success, Try}
+
+object KubernetesSessionClientV2 extends KubernetesClientV2Trait with Logger {
+ @throws[Exception]
+ override def doSubmit(
+ submitRequest: SubmitRequest,
+ flinkConfig: Configuration): SubmitResponse = {
+
+ val richMsg: String => String =
s"[flink-submit][appId=${submitRequest.id}] " + _
+
+ submitRequest.checkBuildResult()
+ val buildResult =
submitRequest.buildResult.asInstanceOf[K8sAppModeBuildResponse]
+
+ // Convert to FlinkSessionJobDef CR definition
+ val flinkSessionJobDef = genFlinkSessionJobDef(submitRequest, flinkConfig,
buildResult) match {
+ case Right(result) => result
+ case Left(errMsg) =>
+ throw new IllegalArgumentException(
+ richMsg(s"Error occurred while parsing parameters: $errMsg"))
+ }
+
+ // Submit FlinkSessionJobDef CR to Kubernetes
+ FlinkK8sOperator.deploySessionJob(submitRequest.id,
flinkSessionJobDef).runIOAsTry match {
+ case Success(_) =>
+ logInfo(richMsg("Flink job has been submitted successfully."))
+ case Failure(err) =>
+ logError(
+ richMsg(s"Submit Flink job fail in
${submitRequest.executionMode.getName}_V2 mode!"),
+ err)
+ throw err
+ }
+
+ SubmitResponse(
+ clusterId = submitRequest.k8sSubmitParam.clusterId,
+ flinkConfig = flinkConfig.toMap,
+ jobId = submitRequest.jobId,
+ jobManagerUrl = null
+ )
+ }
+
+ // Generate FlinkSessionJobDef CR definition, it is a pure effect function.
+ private def genFlinkSessionJobDef(
+ submitReq: SubmitRequest,
+ originFlinkConfig: Configuration,
+ buildResult: K8sAppModeBuildResponse): Either[FailureMessage,
FlinkSessionJobDef] = {
+
+ val flinkConfObj = originFlinkConfig.clone()
+
+ val namespace = Option(submitReq.k8sSubmitParam.kubernetesNamespace)
+ .getOrElse("default")
+
+ val name = Option(submitReq.k8sSubmitParam.clusterId)
+ .filter(str => StringUtils.isNotBlank(str))
+ .getOrElse(return Left("cluster-id should not be empty"))
+
+ val deploymentName = Option(submitReq.developmentMode.name())
+ .filter(str => StringUtils.isNotBlank(str))
+ .getOrElse(return Left("deploymentName should not be empty"))
+
+ val jobDef = genJobDef(flinkConfObj, jarUriHint =
Some(buildResult.mainJarPath))
+ .getOrElse(return Left("Invalid job definition"))
+
+ val extraFlinkConfiguration = {
+ // Remove conflicting configuration items
+ val result: mutable.Map[String, String] = flinkConfObj
+ .remove(DeploymentOptions.TARGET)
+ .remove(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY)
+ .remove(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT)
+ .remove(JobManagerOptions.TOTAL_PROCESS_MEMORY)
+ .remove(TaskManagerOptions.TOTAL_PROCESS_MEMORY)
+ .remove(PipelineOptions.JARS)
+ .remove(CoreOptions.DEFAULT_PARALLELISM)
+ .remove(ApplicationConfiguration.APPLICATION_ARGS)
+ .remove(ApplicationConfiguration.APPLICATION_MAIN_CLASS)
+ .remove(SavepointConfigOptions.SAVEPOINT_PATH)
+ .remove(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)
+ .toMap
+ .asScala
+ .removeKey(KUBERNETES_JM_CPU_AMOUNT_KEY)
+ .removeKey(KUBERNETES_TM_CPU_KEY)
+ .removeKey(KUBERNETES_JM_CPU_AMOUNT_KEY)
+ .removeKey(KUBERNETES_JM_CPU_KEY)
+ // Set kubernetes.rest-service.exposed.type configuration for
compatibility with native-k8s
+ submitReq.k8sSubmitParam.flinkRestExposedType.foreach {
+ exposedType => result += KUBERNETES_REST_SERVICE_EXPORTED_TYPE_KEY ->
exposedType.getName
+ }
+ result.toMap
+ }
+
+ // TODO Migrate the construction logic of ingress to here and set it into
FlinkDeploymentDef.ingress
+ // See:
org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sApplicationBuildPipeline
Step-8
+ Right(
+ FlinkSessionJobDef(
+ namespace = namespace,
+ name = name,
+ deploymentName = deploymentName,
+ flinkConfiguration = extraFlinkConfiguration,
+ job = jobDef,
+ restartNonce = None
+ ))
+ }
+
+ def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
+ def richMsg: String => String =
s"[flink-delete-][appId=${shutDownRequest.clusterId}] " + _
+
+ FlinkK8sOperator.k8sCrOpr
+ .deleteSessionJob(
+ shutDownRequest.kubernetesDeployParam.kubernetesNamespace,
+ shutDownRequest.clusterId)
+ .runIOAsTry match {
+ case Success(rsp) =>
+ logInfo(richMsg("Delete flink job successfully."))
+ rsp
+ case Failure(err) =>
+ logError(
+ richMsg(s"delete flink job fail in
${shutDownRequest.executionMode.getName}_V2 mode!"),
+ err)
+ throw err
+ }
+ ShutDownResponse()
+ }
+
+}