This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 095be65b7 [Feature] [Flink-K8s-V2] Refactor the lifecycle control code 
of Flink clusters on Kubernetes (#3224)
095be65b7 is described below

commit 095be65b7ac9ee7c71c4813fbbf0b2fc7b6be422
Author: caicancai <[email protected]>
AuthorDate: Sun Oct 8 21:18:13 2023 +0800

    [Feature] [Flink-K8s-V2] Refactor the lifecycle control code of Flink 
clusters on Kubernetes (#3224)
---
 .../core/service/impl/FlinkClusterServiceImpl.java |   1 +
 .../flink/client/bean/DeployRequest.scala          |   1 +
 .../client/impl/KubernetesSessionClientV2.scala    | 136 ++++++++++++++++++++-
 3 files changed, 136 insertions(+), 2 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 57360757d..3dabd573d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -436,6 +436,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
             flinkCluster.getFlinkExecutionModeEnum(),
             flinkCluster.getProperties(),
             flinkCluster.getClusterId(),
+            flinkCluster.getId(),
             getKubernetesDeployDesc(flinkCluster, "start"));
     log.info("Deploy cluster request " + deployRequest);
     Future<DeployResponse> future = executorService.submit(() -> 
FlinkClient.deploy(deployRequest));
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
index 13479ea11..f38925599 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
@@ -34,6 +34,7 @@ case class DeployRequest(
     executionMode: FlinkExecutionMode,
     properties: JavaMap[String, Any],
     clusterId: String,
+    id: Long,
     @Nullable k8sDeployParam: KubernetesDeployParam) {
 
   private[client] lazy val hdfsWorkspace = {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
index 176c801f5..3cbfbdd2e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
@@ -20,7 +20,7 @@ import org.apache.streampark.common.util.Logger
 import org.apache.streampark.common.zio.ZIOExt.{IOOps, OptionZIOOps}
 import org.apache.streampark.flink.client.`trait`.KubernetesClientV2Trait
 import org.apache.streampark.flink.client.bean._
-import org.apache.streampark.flink.kubernetes.v2.model.FlinkSessionJobDef
+import org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef, 
FlinkSessionJobDef, JobManagerDef, TaskManagerDef}
 import org.apache.streampark.flink.kubernetes.v2.model.TrackKey.ClusterKey
 import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver
 import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
@@ -30,11 +30,14 @@ import 
org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
 import org.apache.commons.lang3.StringUtils
 import org.apache.flink.client.deployment.application.ApplicationConfiguration
 import org.apache.flink.configuration._
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
+import org.apache.flink.v1beta1.FlinkDeploymentSpec.FlinkVersion
 import zio.ZIO
 
+import scala.collection.mutable
 import scala.jdk.CollectionConverters.mapAsScalaMapConverter
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 
 /** Flink K8s session mode app operation client via Flink K8s Operator */
 object KubernetesSessionClientV2 extends KubernetesClientV2Trait with Logger {
@@ -120,6 +123,34 @@ object KubernetesSessionClientV2 extends 
KubernetesClientV2Trait with Logger {
       ))
   }
 
+  @throws[Throwable]
+  def deploy(deployRequest: DeployRequest): DeployResponse = {
+
+    val richMsg: String => String = 
s"[flink-submit][appId=${deployRequest.id}] " + _
+
+    val flinkConfig =
+      extractConfiguration(deployRequest.flinkVersion.flinkHome, 
deployRequest.properties)
+    // Convert to FlinkDeployment CR definition
+    val flinkDeployDef = genFlinkDeployDef(deployRequest, flinkConfig) match {
+      case Right(result) => result
+      case Left(errMsg) =>
+        throw new IllegalArgumentException(
+          richMsg(s"Error occurred while parsing parameters:$errMsg"))
+    }
+
+    // Submit FlinkDeployment CR to Kubernetes
+    FlinkK8sOperator.deployCluster(deployRequest.id, 
flinkDeployDef).runIOAsTry match {
+      case Success(_) =>
+        logInfo(richMsg("Flink Cluster has been submitted successfully."))
+        DeployResponse(null, deployRequest.clusterId)
+      case Failure(err) =>
+        logError(
+          richMsg(s"Submit Flink Cluster fail 
in${deployRequest.executionMode.getName}_V2 mode!"),
+          err)
+        throw err
+    }
+  }
+
   /** Shutdown Flink cluster. */
   @throws[Throwable]
   def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
@@ -144,4 +175,105 @@ object KubernetesSessionClientV2 extends 
KubernetesClientV2Trait with Logger {
     }
   }
 
+  private def genFlinkDeployDef(
+      deployRequest: DeployRequest,
+      originFlinkConfig: Configuration): Either[FailureMessage, 
FlinkDeploymentDef] = {
+    val flinkConfObj = originFlinkConfig.clone()
+    val flinkConfMap = originFlinkConfig.toMap.asScala.toMap
+
+    val namespace = Option(deployRequest.k8sDeployParam.kubernetesNamespace)
+      .getOrElse("default")
+
+    val name = Option(deployRequest.k8sDeployParam.clusterId)
+      .filter(str => StringUtils.isNotBlank(str))
+      .getOrElse(return Left("Kubernetes CR name should not be empty"))
+
+    val imagePullPolicy = flinkConfObj
+      .getOption(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY)
+      .map(_.toString)
+
+    val image = Option(deployRequest.k8sDeployParam.flinkImage)
+      .filter(str => StringUtils.isNotBlank(str))
+      .getOrElse(return Left("Flink base image should not be empty"))
+
+    val serviceAccount = Option(deployRequest.k8sDeployParam.serviceAccount)
+      .getOrElse(FlinkDeploymentDef.DEFAULT_SERVICE_ACCOUNT)
+
+    val flinkVersion = Option(deployRequest.flinkVersion.majorVersion)
+      .map(majorVer => "V" + majorVer.replace(".", "_"))
+      .flatMap(v => FlinkVersion.values().find(_.name() == v))
+      .getOrElse(
+        return Left(s"Unsupported Flink 
version:${deployRequest.flinkVersion.majorVersion}"))
+
+    val jobManager = {
+      val cpu = flinkConfMap
+        .get(KUBERNETES_JM_CPU_AMOUNT_KEY)
+        .orElse(flinkConfMap.get(KUBERNETES_JM_CPU_KEY))
+        .flatMap(value => Try(value.toDouble).toOption)
+        .getOrElse(KUBERNETES_JM_CPU_DEFAULT)
+
+      val mem = flinkConfObj
+        .getOption(JobManagerOptions.TOTAL_PROCESS_MEMORY)
+        .map(_.toString)
+        .getOrElse(KUBERNETES_JM_MEMORY_DEFAULT)
+
+      JobManagerDef(cpu = cpu, memory = mem, ephemeralStorage = None, 
podTemplate = None)
+    }
+
+    val taskManager = {
+      val cpu = flinkConfMap
+        .get(KUBERNETES_TM_CPU_AMOUNT_KEY)
+        .orElse(flinkConfMap.get(KUBERNETES_TM_CPU_KEY))
+        .flatMap(value => Try(value.toDouble).toOption)
+        .getOrElse(KUBERNETES_TM_CPU_DEFAULT)
+
+      val mem = flinkConfObj
+        .getOption(TaskManagerOptions.TOTAL_PROCESS_MEMORY)
+        .map(_.toString)
+        .getOrElse(KUBERNETES_TM_MEMORY_DEFAULT)
+
+      TaskManagerDef(cpu = cpu, memory = mem, ephemeralStorage = None, 
podTemplate = None)
+    }
+
+    val extraFlinkConfiguration = {
+      // Remove conflicting configuration items
+      val result: mutable.Map[String, String] = flinkConfObj
+        .remove(DeploymentOptions.TARGET)
+        .remove(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY)
+        .remove(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT)
+        .remove(JobManagerOptions.TOTAL_PROCESS_MEMORY)
+        .remove(TaskManagerOptions.TOTAL_PROCESS_MEMORY)
+        .remove(PipelineOptions.JARS)
+        .remove(CoreOptions.DEFAULT_PARALLELISM)
+        .remove(ApplicationConfiguration.APPLICATION_ARGS)
+        .remove(ApplicationConfiguration.APPLICATION_MAIN_CLASS)
+        .remove(SavepointConfigOptions.SAVEPOINT_PATH)
+        .remove(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)
+        .toMap
+        .asScala
+        .removeKey(KUBERNETES_JM_CPU_AMOUNT_KEY)
+        .removeKey(KUBERNETES_TM_CPU_KEY)
+        .removeKey(KUBERNETES_JM_CPU_AMOUNT_KEY)
+        .removeKey(KUBERNETES_JM_CPU_KEY)
+      Option(deployRequest.k8sDeployParam.flinkRestExposedType).foreach {
+        exposedType => result += KUBERNETES_REST_SERVICE_EXPORTED_TYPE_KEY -> 
exposedType.getName
+      }
+      result.toMap
+    }
+
+    Right(
+      FlinkDeploymentDef(
+        namespace = namespace,
+        name = name,
+        image = image,
+        imagePullPolicy = imagePullPolicy,
+        serviceAccount = serviceAccount,
+        flinkVersion = flinkVersion,
+        jobManager = jobManager,
+        taskManager = taskManager,
+        flinkConfiguration = extraFlinkConfiguration,
+        extJarPaths = Array.empty
+      ))
+  }
+
 }

Reply via email to