This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new d736d2b64 [feat][flink-k8s-v2] Migrate the creation process of the 
flink deploy ingress to the Flink kubernetes operator (#2879) (#3054)
d736d2b64 is described below

commit d736d2b64a1b13fbb4b18b883b9c08cf53d08e99
Author: Linying Assad <[email protected]>
AuthorDate: Thu Sep 14 00:01:35 2023 -0500

    [feat][flink-k8s-v2] Migrate the creation process of the flink deploy 
ingress to the Flink kubernetes operator (#2879) (#3054)
---
 .../impl/ApplicationActionServiceImpl.java         | 10 ++++++--
 .../core/utils/FlinkK8sDataTypeConverterStub.java  | 11 ++++++++
 .../core/utils/FlinkK8sDataTypeConverter.scala     | 29 +++++++++++++++++++++-
 .../flink/client/bean/KubernetesSubmitParam.scala  | 10 +++++---
 .../impl/KubernetesApplicationClientV2.scala       |  7 +++---
 .../impl/KubernetesNativeApplicationClient.scala   |  1 +
 .../impl/KubernetesNativeSessionClient.scala       |  1 +
 7 files changed, 60 insertions(+), 9 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index e9d5d6906..096339aac 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -18,6 +18,7 @@
 package org.apache.streampark.console.core.service.application.impl;
 
 import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.conf.K8sFlinkConfig;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.enums.DevelopmentMode;
 import org.apache.streampark.common.enums.ExecutionMode;
@@ -65,6 +66,7 @@ import 
org.apache.streampark.console.core.service.application.ApplicationActionS
 import 
org.apache.streampark.console.core.service.application.ApplicationInfoService;
 import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
 import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverterStub;
 import org.apache.streampark.flink.client.FlinkClient;
 import org.apache.streampark.flink.client.bean.CancelRequest;
 import org.apache.streampark.flink.client.bean.CancelResponse;
@@ -158,6 +160,8 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
 
   @Autowired private ResourceService resourceService;
 
+  @Autowired private FlinkK8sDataTypeConverterStub flinkK8sDataTypeConverter;
+
   private final Map<Long, CompletableFuture<SubmitResponse>> startFutureMap =
       new ConcurrentHashMap<>();
 
@@ -422,7 +426,8 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
             application.getK8sName(),
             application.getK8sNamespace(),
             application.getFlinkImage(),
-            application.getK8sRestExposedTypeEnum());
+            application.getK8sRestExposedTypeEnum(),
+            flinkK8sDataTypeConverter.genDefaultFlinkDeploymentIngressDef());
 
     Tuple2<String, String> userJarAndAppConf = getUserJarAndAppConf(flinkEnv, 
application);
     String flinkUserJar = userJarAndAppConf.f0;
@@ -527,7 +532,8 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
             })
         .whenComplete(
             (t, e) -> {
-              if 
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
+              if (!K8sFlinkConfig.isV2Enabled()
+                  && 
ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
                 String domainName = settingService.getIngressModeDefault();
                 if (StringUtils.isNotBlank(domainName)) {
                   try {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverterStub.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverterStub.java
index c9d9bf8a8..124696c06 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverterStub.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverterStub.java
@@ -17,8 +17,19 @@
 
 package org.apache.streampark.console.core.utils;
 
+import org.apache.streampark.flink.kubernetes.v2.model.IngressDef;
+
+import javax.annotation.Nullable;
+
 public interface FlinkK8sDataTypeConverterStub {
 
   /** Create default name for Flink SessionJob CR for k8s-native 
compatibility. */
   String genSessionJobK8sCRName(String clusterId);
+
+  /**
+   * Create default FlinkDeployment CR definition, Used for compatibility with 
streampark flink k8s
+   * v1 logic.
+   */
+  @Nullable
+  IngressDef genDefaultFlinkDeploymentIngressDef();
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverter.scala
 
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverter.scala
index 0813b5373..846ca61b2 100644
--- 
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverter.scala
+++ 
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverter.scala
@@ -17,9 +17,11 @@
 
 package org.apache.streampark.console.core.utils
 
+import org.apache.streampark.common.conf.{InternalConfigHolder, K8sFlinkConfig}
 import org.apache.streampark.common.enums.{ClusterState, ExecutionMode}
 import org.apache.streampark.console.core.entity.{Application, FlinkCluster}
 import org.apache.streampark.console.core.enums.FlinkAppState
+import org.apache.streampark.console.core.service.SettingService
 import 
org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverter.genSessionJobCRName
 import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV
 import org.apache.streampark.flink.kubernetes.v2.model._
@@ -27,15 +29,40 @@ import 
org.apache.streampark.flink.kubernetes.v2.model.EvalJobState.EvalJobState
 import org.apache.streampark.flink.kubernetes.v2.model.TrackKey.ClusterKey
 
 import org.apache.commons.lang3.StringUtils
+import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.stereotype.Component
 
+import javax.annotation.Nullable
+
 import java.util.UUID
 
 import scala.util.Try
 
 @Component
-class FlinkK8sDataTypeConverter() extends FlinkK8sDataTypeConverterStub {
+class FlinkK8sDataTypeConverter @Autowired() (
+    settingService: SettingService
+) extends FlinkK8sDataTypeConverterStub {
+
   override def genSessionJobK8sCRName(clusterId: String): String = 
genSessionJobCRName(clusterId)
+
+  /**
+   * Create default FlinkDeployment CR definition,
+   * Used for compatibility with streampark flink k8s v1 logic.
+   */
+  @throws[Exception] @Nullable
+  override def genDefaultFlinkDeploymentIngressDef(): IngressDef = {
+    val domainName = settingService.getIngressModeDefault
+    if (StringUtils.isBlank(domainName)) null
+    else {
+      val ingressClass = 
InternalConfigHolder.get[String](K8sFlinkConfig.ingressClass)
+      IngressDef(
+        template = domainName + "/{{namespace}}/{{name}}(/|$)(.*)",
+        annotations = Map("nginx.ingress.kubernetes.io/rewrite-target" -> 
"/$2"),
+        className = Option(ingressClass)
+      )
+    }
+  }
+
 }
 
 object FlinkK8sDataTypeConverter {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala
index f959b0c32..e3351200f 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala
@@ -18,6 +18,7 @@
 package org.apache.streampark.flink.client.bean
 
 import org.apache.streampark.common.enums.FlinkK8sRestExposedType
+import org.apache.streampark.flink.kubernetes.v2.model.IngressDef
 
 import javax.annotation.Nullable
 
@@ -50,7 +51,8 @@ case class KubernetesSubmitParam(
     taskManagerEphemeralStorage: Option[String] = None,
     taskManagerPodTemplate: Option[String] = None,
     logConfiguration: JMap[String, String] = new util.HashMap[String, 
String](),
-    flinkRestExposedType: Option[FlinkK8sRestExposedType] = None
+    flinkRestExposedType: Option[FlinkK8sRestExposedType] = None,
+    ingressDefinition: Option[IngressDef] = None
 )
 
 object KubernetesSubmitParam {
@@ -70,12 +72,14 @@ object KubernetesSubmitParam {
       kubernetesName: String,
       kubernetesNamespace: String,
       baseImage: String,
-      @Nullable flinkRestExposedType: FlinkK8sRestExposedType): 
KubernetesSubmitParam =
+      @Nullable flinkRestExposedType: FlinkK8sRestExposedType,
+      @Nullable ingressDefinition: IngressDef): KubernetesSubmitParam =
     KubernetesSubmitParam(
       clusterId = clusterId,
       kubernetesNamespace = kubernetesNamespace,
       kubernetesName = Option(kubernetesName),
       baseImage = Some(baseImage),
-      flinkRestExposedType = Option(flinkRestExposedType)
+      flinkRestExposedType = Option(flinkRestExposedType),
+      ingressDefinition = Option(ingressDefinition)
     )
 }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
index 20f8d6aa5..a19368acd 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
@@ -98,6 +98,8 @@ object KubernetesApplicationClientV2 extends 
KubernetesClientV2Trait with Logger
       .filter(str => StringUtils.isNotBlank(str))
       .getOrElse(return Left("Flink base image should not be empty"))
 
+    val ingress = submitReq.k8sSubmitParam.ingressDefinition
+
     val imagePullPolicy = flinkConfObj
       .getOption(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY)
       .map(_.toString)
@@ -220,8 +222,6 @@ object KubernetesApplicationClientV2 extends 
KubernetesClientV2Trait with Logger
       result.toMap
     }
 
-    // TODO Migrate the construction logic of ingress to here and set it into 
FlinkDeploymentDef.ingress
-    //  See: 
org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sApplicationBuildPipeline
 Step-8
     Right(
       FlinkDeploymentDef(
         namespace = namespace,
@@ -236,7 +236,8 @@ object KubernetesApplicationClientV2 extends 
KubernetesClientV2Trait with Logger
         logConfiguration = logConfiguration,
         podTemplate = podTemplate,
         job = Some(jobDef),
-        extJarPaths = Array.empty
+        extJarPaths = Array.empty,
+        ingress = ingress
       ))
   }
 
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
index be57adb41..ea4905d9c 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
@@ -32,6 +32,7 @@ import org.apache.flink.kubernetes.KubernetesClusterDescriptor
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
 
 /** kubernetes native application mode submit */
+@deprecated("use KubernetesApplicationClientV2 instead")
 object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait {
 
   @throws[Exception]
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 2daa1f68a..c4c9c6e41 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -41,6 +41,7 @@ import scala.language.postfixOps
 import scala.util.{Failure, Success, Try}
 
 /** kubernetes native session mode submit */
+@deprecated("use KubernetesSessionClientV2 instead")
 object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with 
Logger {
 
   @throws[Exception]

Reply via email to