This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 232d93f1f [Feature] [Flink-K8s-V2] Refactor the lifecycle control of 
Flink session-mode jobs on Kubernetes  (#3041)
232d93f1f is described below

commit 232d93f1fb65a2e1ae2bc1148a3e15c74845c5b4
Author: caicancai <[email protected]>
AuthorDate: Tue Sep 12 12:46:53 2023 +0800

    [Feature] [Flink-K8s-V2] Refactor the lifecycle control of Flink 
session-mode jobs on Kubernetes  (#3041)
    
    * [Feature] [Flink-K8s-V2] Refactor the lifecycle control of 
Flinksession-mode jobs on Kubernetes
    
    * add Apache license header
---
 .../flink/client/FlinkClientHandler.scala          |   5 +-
 .../client/impl/KubernetesSessionClientV2.scala    | 157 +++++++++++++++++++++
 2 files changed, 161 insertions(+), 1 deletion(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
index 810df34e3..3da974d7b 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala
@@ -32,7 +32,10 @@ object FlinkClientHandler {
     YARN_APPLICATION -> YarnApplicationClient,
     YARN_SESSION -> YarnSessionClient,
     YARN_PER_JOB -> YarnPerJobClient,
-    KUBERNETES_NATIVE_SESSION -> KubernetesNativeSessionClient,
+    KUBERNETES_NATIVE_SESSION -> {
+      if (K8sFlinkConfig.isV2Enabled) KubernetesSessionClientV2
+      else KubernetesNativeSessionClient
+    },
     KUBERNETES_NATIVE_APPLICATION -> {
       if (K8sFlinkConfig.isV2Enabled) KubernetesApplicationClientV2
       else KubernetesNativeApplicationClient
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
new file mode 100644
index 000000000..d5a9f51db
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.client.impl
+
+import org.apache.streampark.common.util.Logger
+import org.apache.streampark.common.zio.ZIOExt.IOOps
+import org.apache.streampark.flink.client.`trait`.KubernetesClientV2Trait
+import org.apache.streampark.flink.client.bean._
+import org.apache.streampark.flink.kubernetes.v2.model.{FlinkSessionJobDef, 
JobManagerDef, TaskManagerDef}
+import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
+import org.apache.streampark.flink.packer.pipeline.K8sAppModeBuildResponse
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.client.deployment.application.ApplicationConfiguration
+import org.apache.flink.configuration._
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters.mapAsScalaMapConverter
+import scala.util.{Failure, Success, Try}
+
+object KubernetesSessionClientV2 extends KubernetesClientV2Trait with Logger {
+  @throws[Exception]
+  override def doSubmit(
+      submitRequest: SubmitRequest,
+      flinkConfig: Configuration): SubmitResponse = {
+
+    val richMsg: String => String = 
s"[flink-submit][appId=${submitRequest.id}] " + _
+
+    submitRequest.checkBuildResult()
+    val buildResult = 
submitRequest.buildResult.asInstanceOf[K8sAppModeBuildResponse]
+
+    // Convert to FlinkSessionJobDef CR definition
+    val flinkSessionJobDef = genFlinkSessionJobDef(submitRequest, flinkConfig, 
buildResult) match {
+      case Right(result) => result
+      case Left(errMsg) =>
+        throw new IllegalArgumentException(
+          richMsg(s"Error occurred while parsing parameters: $errMsg"))
+    }
+
+    // Submit FlinkSessionJobDef CR to Kubernetes
+    FlinkK8sOperator.deploySessionJob(submitRequest.id, 
flinkSessionJobDef).runIOAsTry match {
+      case Success(_) =>
+        logInfo(richMsg("Flink job has been submitted successfully."))
+      case Failure(err) =>
+        logError(
+          richMsg(s"Submit Flink job fail in 
${submitRequest.executionMode.getName}_V2 mode!"),
+          err)
+        throw err
+    }
+
+    SubmitResponse(
+      clusterId = submitRequest.k8sSubmitParam.clusterId,
+      flinkConfig = flinkConfig.toMap,
+      jobId = submitRequest.jobId,
+      jobManagerUrl = null
+    )
+  }
+
+  // Generate FlinkSessionJobDef CR definition, it is a pure effect function.
+  private def genFlinkSessionJobDef(
+      submitReq: SubmitRequest,
+      originFlinkConfig: Configuration,
+      buildResult: K8sAppModeBuildResponse): Either[FailureMessage, 
FlinkSessionJobDef] = {
+
+    val flinkConfObj = originFlinkConfig.clone()
+
+    val namespace = Option(submitReq.k8sSubmitParam.kubernetesNamespace)
+      .getOrElse("default")
+
+    val name = Option(submitReq.k8sSubmitParam.clusterId)
+      .filter(str => StringUtils.isNotBlank(str))
+      .getOrElse(return Left("cluster-id should not be empty"))
+
+    val deploymentName = Option(submitReq.developmentMode.name())
+      .filter(str => StringUtils.isNotBlank(str))
+      .getOrElse(return Left("deploymentName should not be empty"))
+
+    val jobDef = genJobDef(flinkConfObj, jarUriHint = 
Some(buildResult.mainJarPath))
+      .getOrElse(return Left("Invalid job definition"))
+
+    val extraFlinkConfiguration = {
+      // Remove conflicting configuration items
+      val result: mutable.Map[String, String] = flinkConfObj
+        .remove(DeploymentOptions.TARGET)
+        .remove(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY)
+        .remove(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT)
+        .remove(JobManagerOptions.TOTAL_PROCESS_MEMORY)
+        .remove(TaskManagerOptions.TOTAL_PROCESS_MEMORY)
+        .remove(PipelineOptions.JARS)
+        .remove(CoreOptions.DEFAULT_PARALLELISM)
+        .remove(ApplicationConfiguration.APPLICATION_ARGS)
+        .remove(ApplicationConfiguration.APPLICATION_MAIN_CLASS)
+        .remove(SavepointConfigOptions.SAVEPOINT_PATH)
+        .remove(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)
+        .toMap
+        .asScala
+        .removeKey(KUBERNETES_JM_CPU_AMOUNT_KEY)
+        .removeKey(KUBERNETES_TM_CPU_KEY)
+        .removeKey(KUBERNETES_JM_CPU_AMOUNT_KEY)
+        .removeKey(KUBERNETES_JM_CPU_KEY)
+      // Set kubernetes.rest-service.exposed.type configuration for 
compatibility with native-k8s
+      submitReq.k8sSubmitParam.flinkRestExposedType.foreach {
+        exposedType => result += KUBERNETES_REST_SERVICE_EXPORTED_TYPE_KEY -> 
exposedType.getName
+      }
+      result.toMap
+    }
+
+    // TODO Migrate the construction logic of ingress to here and set it into 
FlinkDeploymentDef.ingress
+    //  See: 
org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sApplicationBuildPipeline
 Step-8
+    Right(
+      FlinkSessionJobDef(
+        namespace = namespace,
+        name = name,
+        deploymentName = deploymentName,
+        flinkConfiguration = extraFlinkConfiguration,
+        job = jobDef,
+        restartNonce = None
+      ))
+  }
+
+  def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
+    def richMsg: String => String = 
s"[flink-delete-][appId=${shutDownRequest.clusterId}] " + _
+
+    FlinkK8sOperator.k8sCrOpr
+      .deleteSessionJob(
+        shutDownRequest.kubernetesDeployParam.kubernetesNamespace,
+        shutDownRequest.clusterId)
+      .runIOAsTry match {
+      case Success(rsp) =>
+        logInfo(richMsg("Delete flink job successfully."))
+        rsp
+      case Failure(err) =>
+        logError(
+          richMsg(s"delete flink job fail in 
${shutDownRequest.executionMode.getName}_V2 mode!"),
+          err)
+        throw err
+    }
+    ShutDownResponse()
+  }
+
+}

Reply via email to