This is an automated email from the ASF dual-hosted git repository.

linying pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8ff30e3de Refactor the lifecycle control of flink application mode 
-dosubmit (#2994)
8ff30e3de is described below

commit 8ff30e3de6c233adc5601839aaa73c4d39bc78cd
Author: caicancai <[email protected]>
AuthorDate: Thu Sep 7 18:39:24 2023 +0800

    Refactor the lifecycle control of flink application mode -dosubmit (#2994)
    
    * remove scala2.11
    
    * [Feature] [Flink-K8s-V2] Refactor the lifecycle control of Flink 
application-mode_dosubmit
    
    * add v2 denpendcy
    
    * mvn spotless:apply
    
    * throw exception
    
    * optimization
    
    * Code optimization
    
    * add comment
    
    * add comment
    
    * add zio denpency
    
    * resolve dependency conflicts
    
    * add KubernetesNativeApplicationClient_V2
    
    * Parameter optimization and add license
    
    * Parameter optimization and add license
    
    * add flinkconfig param
    
    * Rename to KubernetesNativeApplicationV2
---
 .../streampark-flink-client-core/pom.xml           |  27 +++++
 .../impl/KubernetesNativeApplicationClientV2.scala | 121 +++++++++++++++++++++
 2 files changed, 148 insertions(+)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
index 9e942fc1f..506d428df 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/pom.xml
@@ -51,6 +51,18 @@
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            
<artifactId>streampark-flink-kubernetes-core_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>jackson-dataformat-yaml</artifactId>
+                    <groupId>com.fasterxml.jackson.dataformat</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-yarn${scala.binary.flink.version}</artifactId>
@@ -94,6 +106,21 @@
             <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
         </dependency>
 
+        <!-- ZIO -->
+        <dependency>
+            <groupId>dev.zio</groupId>
+            <artifactId>zio-logging_${scala.binary.version}</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>dev.zio</groupId>
+            <artifactId>zio-streams_${scala.binary.version}</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>dev.zio</groupId>
+            <artifactId>zio-concurrent_${scala.binary.version}</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClientV2.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClientV2.scala
new file mode 100644
index 000000000..f43e88f0b
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClientV2.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.client.impl
+
+import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, 
ExecutionMode}
+import org.apache.streampark.common.zio.ZIOExt.IOOps
+import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait
+import org.apache.streampark.flink.client.bean._
+import org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef, 
JobDef, JobManagerDef, TaskManagerDef}
+import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
+import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.configuration.{Configuration, DeploymentOptions, 
ExecutionOptions, JobManagerOptions, PipelineOptions, TaskManagerOptions}
+import org.apache.flink.v1beta1.FlinkDeploymentSpec.FlinkVersion
+
+import scala.collection.JavaConverters._
+import scala.language.postfixOps
+
+object KubernetesNativeApplicationClientV2 extends KubernetesNativeClientTrait 
{
+  @throws[Exception]
+  override def doSubmit(
+      submitRequest: SubmitRequest,
+      flinkConfig: Configuration): SubmitResponse = {
+
+    // require parameters
+    require(
+      StringUtils.isNotBlank(submitRequest.k8sSubmitParam.clusterId),
+      s"[flink-submit] submit flink job failed, clusterId is null, 
mode=${flinkConfig.get(DeploymentOptions.TARGET)}"
+    )
+
+    // check the last building result
+    submitRequest.checkBuildResult()
+
+    try {
+      val spec: FlinkDeploymentDef = convertFlinkDeploymentDef(submitRequest, 
flinkConfig)
+      FlinkK8sOperator.deployApplicationJob(submitRequest.id, spec).runIO
+      val result = SubmitResponse(null, flinkConfig.toMap, 
submitRequest.jobId, null)
+      logInfo(
+        s"[flink-submit] flink job has been submitted. 
${flinkConfIdentifierInfo(flinkConfig)}")
+      result
+    } catch {
+      case e: Exception =>
+        logError(s"submit flink job fail in ${submitRequest.executionMode} 
mode")
+        throw e
+    } finally {}
+  }
+
+  override def doCancel(
+      cancelRequest: CancelRequest,
+      flinkConfig: Configuration): CancelResponse = {
+    flinkConfig.safeSet(
+      DeploymentOptions.TARGET,
+      ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
+    super.doCancel(cancelRequest, flinkConfig)
+  }
+
+  override def doTriggerSavepoint(
+      request: TriggerSavepointRequest,
+      flinkConf: Configuration): SavepointResponse = {
+    flinkConf.safeSet(DeploymentOptions.TARGET, 
ExecutionMode.KUBERNETES_NATIVE_APPLICATION.getName)
+    super.doTriggerSavepoint(request, flinkConf)
+  }
+
+  private[this] def convertFlinkDeploymentDef(
+      submitRequest: SubmitRequest,
+      flinkConfig: Configuration): FlinkDeploymentDef = {
+    val spec = FlinkDeploymentDef(
+      name = submitRequest.appName,
+      namespace = submitRequest.k8sSubmitParam.kubernetesNamespace,
+      image = 
submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse].flinkImageTag,
+      flinkVersion = Option(submitRequest.flinkVersion.majorVersion)
+        .map(_.replace(".", "_"))
+        .map("V" + _)
+        .flatMap(v => FlinkVersion.values().find(_.name() == v)) match {
+        case Some(version) => version
+        case None => throw new IllegalArgumentException("Flink version not 
found")
+      },
+      jobManager = JobManagerDef(
+        cpu = 1,
+        memory = 
flinkConfig.get(JobManagerOptions.TOTAL_PROCESS_MEMORY).toString),
+      taskManager = TaskManagerDef(
+        cpu = 1,
+        memory = 
flinkConfig.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY).toString),
+      job = Option(
+        JobDef(
+          jarURI =
+            
submitRequest.buildResult.asInstanceOf[DockerImageBuildResponse].dockerInnerMainJarPath,
+          parallelism = 1,
+          args = 
Array(flinkConfig.toMap.get("$internal.application.program-args")),
+          entryClass = Some(submitRequest.appMain),
+          initialSavepointPath = Some(submitRequest.savePoint),
+          allowNonRestoredState = Some(submitRequest.allowNonRestoredState)
+        )),
+      extJarPaths = submitRequest.userJarFile match {
+        case null => Array.empty[String]
+        case file => Array(file.getAbsolutePath)
+      },
+      flinkConfiguration = submitRequest.extraParameter match {
+        case null => Map.empty
+        case e => e.asScala.map { case (key, value) => key -> value.toString 
}.toMap
+      }
+    )
+    spec
+  }
+}

Reply via email to