This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 095be65b7 [Feature] [Flink-K8s-V2] Refactor the lifecycle control code
of Flink clusters on Kubernetes (#3224)
095be65b7 is described below
commit 095be65b7ac9ee7c71c4813fbbf0b2fc7b6be422
Author: caicancai <[email protected]>
AuthorDate: Sun Oct 8 21:18:13 2023 +0800
[Feature] [Flink-K8s-V2] Refactor the lifecycle control code of Flink
clusters on Kubernetes (#3224)
---
.../core/service/impl/FlinkClusterServiceImpl.java | 1 +
.../flink/client/bean/DeployRequest.scala | 1 +
.../client/impl/KubernetesSessionClientV2.scala | 136 ++++++++++++++++++++-
3 files changed, 136 insertions(+), 2 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 57360757d..3dabd573d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -436,6 +436,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
flinkCluster.getFlinkExecutionModeEnum(),
flinkCluster.getProperties(),
flinkCluster.getClusterId(),
+ flinkCluster.getId(),
getKubernetesDeployDesc(flinkCluster, "start"));
log.info("Deploy cluster request " + deployRequest);
Future<DeployResponse> future = executorService.submit(() ->
FlinkClient.deploy(deployRequest));
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
index 13479ea11..f38925599 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/DeployRequest.scala
@@ -34,6 +34,7 @@ case class DeployRequest(
executionMode: FlinkExecutionMode,
properties: JavaMap[String, Any],
clusterId: String,
+ id: Long,
@Nullable k8sDeployParam: KubernetesDeployParam) {
private[client] lazy val hdfsWorkspace = {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
index 176c801f5..3cbfbdd2e 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
@@ -20,7 +20,7 @@ import org.apache.streampark.common.util.Logger
import org.apache.streampark.common.zio.ZIOExt.{IOOps, OptionZIOOps}
import org.apache.streampark.flink.client.`trait`.KubernetesClientV2Trait
import org.apache.streampark.flink.client.bean._
-import org.apache.streampark.flink.kubernetes.v2.model.FlinkSessionJobDef
+import org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef,
FlinkSessionJobDef, JobManagerDef, TaskManagerDef}
import org.apache.streampark.flink.kubernetes.v2.model.TrackKey.ClusterKey
import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver
import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
@@ -30,11 +30,14 @@ import
org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
import org.apache.commons.lang3.StringUtils
import org.apache.flink.client.deployment.application.ApplicationConfiguration
import org.apache.flink.configuration._
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
+import org.apache.flink.v1beta1.FlinkDeploymentSpec.FlinkVersion
import zio.ZIO
+import scala.collection.mutable
import scala.jdk.CollectionConverters.mapAsScalaMapConverter
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
/** Flink K8s session mode app operation client via Flink K8s Operator */
object KubernetesSessionClientV2 extends KubernetesClientV2Trait with Logger {
@@ -120,6 +123,34 @@ object KubernetesSessionClientV2 extends
KubernetesClientV2Trait with Logger {
))
}
+ @throws[Throwable]
+ def deploy(deployRequest: DeployRequest): DeployResponse = {
+
+ val richMsg: String => String =
s"[flink-submit][appId=${deployRequest.id}] " + _
+
+ val flinkConfig =
+ extractConfiguration(deployRequest.flinkVersion.flinkHome,
deployRequest.properties)
+ // Convert to FlinkDeployment CR definition
+ val flinkDeployDef = genFlinkDeployDef(deployRequest, flinkConfig) match {
+ case Right(result) => result
+ case Left(errMsg) =>
+ throw new IllegalArgumentException(
+ richMsg(s"Error occurred while parsing parameters:$errMsg"))
+ }
+
+ // Submit FlinkDeployment CR to Kubernetes
+ FlinkK8sOperator.deployCluster(deployRequest.id,
flinkDeployDef).runIOAsTry match {
+ case Success(_) =>
+ logInfo(richMsg("Flink Cluster has been submitted successfully."))
+ DeployResponse(null, deployRequest.clusterId)
+ case Failure(err) =>
+ logError(
+ richMsg(s"Submit Flink Cluster fail
in${deployRequest.executionMode.getName}_V2 mode!"),
+ err)
+ throw err
+ }
+ }
+
/** Shutdown Flink cluster. */
@throws[Throwable]
def shutdown(shutDownRequest: ShutDownRequest): ShutDownResponse = {
@@ -144,4 +175,105 @@ object KubernetesSessionClientV2 extends
KubernetesClientV2Trait with Logger {
}
}
+ private def genFlinkDeployDef(
+ deployRequest: DeployRequest,
+ originFlinkConfig: Configuration): Either[FailureMessage,
FlinkDeploymentDef] = {
+ val flinkConfObj = originFlinkConfig.clone()
+ val flinkConfMap = originFlinkConfig.toMap.asScala.toMap
+
+ val namespace = Option(deployRequest.k8sDeployParam.kubernetesNamespace)
+ .getOrElse("default")
+
+ val name = Option(deployRequest.k8sDeployParam.clusterId)
+ .filter(str => StringUtils.isNotBlank(str))
+ .getOrElse(return Left("Kubernetes CR name should not be empty"))
+
+ val imagePullPolicy = flinkConfObj
+ .getOption(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY)
+ .map(_.toString)
+
+ val image = Option(deployRequest.k8sDeployParam.flinkImage)
+ .filter(str => StringUtils.isNotBlank(str))
+ .getOrElse(return Left("Flink base image should not be empty"))
+
+ val serviceAccount = Option(deployRequest.k8sDeployParam.serviceAccount)
+ .getOrElse(FlinkDeploymentDef.DEFAULT_SERVICE_ACCOUNT)
+
+ val flinkVersion = Option(deployRequest.flinkVersion.majorVersion)
+ .map(majorVer => "V" + majorVer.replace(".", "_"))
+ .flatMap(v => FlinkVersion.values().find(_.name() == v))
+ .getOrElse(
+ return Left(s"Unsupported Flink
version:${deployRequest.flinkVersion.majorVersion}"))
+
+ val jobManager = {
+ val cpu = flinkConfMap
+ .get(KUBERNETES_JM_CPU_AMOUNT_KEY)
+ .orElse(flinkConfMap.get(KUBERNETES_JM_CPU_KEY))
+ .flatMap(value => Try(value.toDouble).toOption)
+ .getOrElse(KUBERNETES_JM_CPU_DEFAULT)
+
+ val mem = flinkConfObj
+ .getOption(JobManagerOptions.TOTAL_PROCESS_MEMORY)
+ .map(_.toString)
+ .getOrElse(KUBERNETES_JM_MEMORY_DEFAULT)
+
+ JobManagerDef(cpu = cpu, memory = mem, ephemeralStorage = None,
podTemplate = None)
+ }
+
+ val taskManager = {
+ val cpu = flinkConfMap
+ .get(KUBERNETES_TM_CPU_AMOUNT_KEY)
+ .orElse(flinkConfMap.get(KUBERNETES_TM_CPU_KEY))
+ .flatMap(value => Try(value.toDouble).toOption)
+ .getOrElse(KUBERNETES_TM_CPU_DEFAULT)
+
+ val mem = flinkConfObj
+ .getOption(TaskManagerOptions.TOTAL_PROCESS_MEMORY)
+ .map(_.toString)
+ .getOrElse(KUBERNETES_TM_MEMORY_DEFAULT)
+
+ TaskManagerDef(cpu = cpu, memory = mem, ephemeralStorage = None,
podTemplate = None)
+ }
+
+ val extraFlinkConfiguration = {
+ // Remove conflicting configuration items
+ val result: mutable.Map[String, String] = flinkConfObj
+ .remove(DeploymentOptions.TARGET)
+ .remove(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY)
+ .remove(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT)
+ .remove(JobManagerOptions.TOTAL_PROCESS_MEMORY)
+ .remove(TaskManagerOptions.TOTAL_PROCESS_MEMORY)
+ .remove(PipelineOptions.JARS)
+ .remove(CoreOptions.DEFAULT_PARALLELISM)
+ .remove(ApplicationConfiguration.APPLICATION_ARGS)
+ .remove(ApplicationConfiguration.APPLICATION_MAIN_CLASS)
+ .remove(SavepointConfigOptions.SAVEPOINT_PATH)
+ .remove(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)
+ .toMap
+ .asScala
+ .removeKey(KUBERNETES_JM_CPU_AMOUNT_KEY)
+ .removeKey(KUBERNETES_TM_CPU_KEY)
+ .removeKey(KUBERNETES_JM_CPU_AMOUNT_KEY)
+ .removeKey(KUBERNETES_JM_CPU_KEY)
+ Option(deployRequest.k8sDeployParam.flinkRestExposedType).foreach {
+ exposedType => result += KUBERNETES_REST_SERVICE_EXPORTED_TYPE_KEY ->
exposedType.getName
+ }
+ result.toMap
+ }
+
+ Right(
+ FlinkDeploymentDef(
+ namespace = namespace,
+ name = name,
+ image = image,
+ imagePullPolicy = imagePullPolicy,
+ serviceAccount = serviceAccount,
+ flinkVersion = flinkVersion,
+ jobManager = jobManager,
+ taskManager = taskManager,
+ flinkConfiguration = extraFlinkConfiguration,
+ extJarPaths = Array.empty
+ ))
+ }
+
}