This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new d736d2b64 [feat][flink-k8s-v2] Migrate the creation process of the
flink deploy ingress to the Flink kubernetes operator (#2879) (#3054)
d736d2b64 is described below
commit d736d2b64a1b13fbb4b18b883b9c08cf53d08e99
Author: Linying Assad <[email protected]>
AuthorDate: Thu Sep 14 00:01:35 2023 -0500
[feat][flink-k8s-v2] Migrate the creation process of the flink deploy
ingress to the Flink kubernetes operator (#2879) (#3054)
---
.../impl/ApplicationActionServiceImpl.java | 10 ++++++--
.../core/utils/FlinkK8sDataTypeConverterStub.java | 11 ++++++++
.../core/utils/FlinkK8sDataTypeConverter.scala | 29 +++++++++++++++++++++-
.../flink/client/bean/KubernetesSubmitParam.scala | 10 +++++---
.../impl/KubernetesApplicationClientV2.scala | 7 +++---
.../impl/KubernetesNativeApplicationClient.scala | 1 +
.../impl/KubernetesNativeSessionClient.scala | 1 +
7 files changed, 60 insertions(+), 9 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index e9d5d6906..096339aac 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -18,6 +18,7 @@
package org.apache.streampark.console.core.service.application.impl;
import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.DevelopmentMode;
import org.apache.streampark.common.enums.ExecutionMode;
@@ -65,6 +66,7 @@ import
org.apache.streampark.console.core.service.application.ApplicationActionS
import
org.apache.streampark.console.core.service.application.ApplicationInfoService;
import
org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.streampark.console.core.task.FlinkHttpWatcher;
+import org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverterStub;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.CancelRequest;
import org.apache.streampark.flink.client.bean.CancelResponse;
@@ -158,6 +160,8 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
@Autowired private ResourceService resourceService;
+ @Autowired private FlinkK8sDataTypeConverterStub flinkK8sDataTypeConverter;
+
private final Map<Long, CompletableFuture<SubmitResponse>> startFutureMap =
new ConcurrentHashMap<>();
@@ -422,7 +426,8 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
application.getK8sName(),
application.getK8sNamespace(),
application.getFlinkImage(),
- application.getK8sRestExposedTypeEnum());
+ application.getK8sRestExposedTypeEnum(),
+ flinkK8sDataTypeConverter.genDefaultFlinkDeploymentIngressDef());
Tuple2<String, String> userJarAndAppConf = getUserJarAndAppConf(flinkEnv,
application);
String flinkUserJar = userJarAndAppConf.f0;
@@ -527,7 +532,8 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
})
.whenComplete(
(t, e) -> {
- if
(ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
+ if (!K8sFlinkConfig.isV2Enabled()
+ &&
ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
String domainName = settingService.getIngressModeDefault();
if (StringUtils.isNotBlank(domainName)) {
try {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverterStub.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverterStub.java
index c9d9bf8a8..124696c06 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverterStub.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverterStub.java
@@ -17,8 +17,19 @@
package org.apache.streampark.console.core.utils;
+import org.apache.streampark.flink.kubernetes.v2.model.IngressDef;
+
+import javax.annotation.Nullable;
+
public interface FlinkK8sDataTypeConverterStub {
/** Create default name for Flink SessionJob CR for k8s-native
compatibility. */
String genSessionJobK8sCRName(String clusterId);
+
+ /**
+ * Create default FlinkDeployment CR definition, Used for compatibility with
streampark flink k8s
+ * v1 logic.
+ */
+ @Nullable
+ IngressDef genDefaultFlinkDeploymentIngressDef();
}
diff --git
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverter.scala
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverter.scala
index 0813b5373..846ca61b2 100644
---
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverter.scala
+++
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/FlinkK8sDataTypeConverter.scala
@@ -17,9 +17,11 @@
package org.apache.streampark.console.core.utils
+import org.apache.streampark.common.conf.{InternalConfigHolder, K8sFlinkConfig}
import org.apache.streampark.common.enums.{ClusterState, ExecutionMode}
import org.apache.streampark.console.core.entity.{Application, FlinkCluster}
import org.apache.streampark.console.core.enums.FlinkAppState
+import org.apache.streampark.console.core.service.SettingService
import
org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverter.genSessionJobCRName
import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV
import org.apache.streampark.flink.kubernetes.v2.model._
@@ -27,15 +29,40 @@ import
org.apache.streampark.flink.kubernetes.v2.model.EvalJobState.EvalJobState
import org.apache.streampark.flink.kubernetes.v2.model.TrackKey.ClusterKey
import org.apache.commons.lang3.StringUtils
+import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
+import javax.annotation.Nullable
+
import java.util.UUID
import scala.util.Try
@Component
-class FlinkK8sDataTypeConverter() extends FlinkK8sDataTypeConverterStub {
+class FlinkK8sDataTypeConverter @Autowired() (
+ settingService: SettingService
+) extends FlinkK8sDataTypeConverterStub {
+
override def genSessionJobK8sCRName(clusterId: String): String =
genSessionJobCRName(clusterId)
+
+ /**
+ * Create default FlinkDeployment CR definition,
+ * Used for compatibility with streampark flink k8s v1 logic.
+ */
+ @throws[Exception] @Nullable
+ override def genDefaultFlinkDeploymentIngressDef(): IngressDef = {
+ val domainName = settingService.getIngressModeDefault
+ if (StringUtils.isBlank(domainName)) null
+ else {
+ val ingressClass =
InternalConfigHolder.get[String](K8sFlinkConfig.ingressClass)
+ IngressDef(
+ template = domainName + "/{{namespace}}/{{name}}(/|$)(.*)",
+ annotations = Map("nginx.ingress.kubernetes.io/rewrite-target" ->
"/$2"),
+ className = Option(ingressClass)
+ )
+ }
+ }
+
}
object FlinkK8sDataTypeConverter {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala
index f959b0c32..e3351200f 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/KubernetesSubmitParam.scala
@@ -18,6 +18,7 @@
package org.apache.streampark.flink.client.bean
import org.apache.streampark.common.enums.FlinkK8sRestExposedType
+import org.apache.streampark.flink.kubernetes.v2.model.IngressDef
import javax.annotation.Nullable
@@ -50,7 +51,8 @@ case class KubernetesSubmitParam(
taskManagerEphemeralStorage: Option[String] = None,
taskManagerPodTemplate: Option[String] = None,
logConfiguration: JMap[String, String] = new util.HashMap[String,
String](),
- flinkRestExposedType: Option[FlinkK8sRestExposedType] = None
+ flinkRestExposedType: Option[FlinkK8sRestExposedType] = None,
+ ingressDefinition: Option[IngressDef] = None
)
object KubernetesSubmitParam {
@@ -70,12 +72,14 @@ object KubernetesSubmitParam {
kubernetesName: String,
kubernetesNamespace: String,
baseImage: String,
- @Nullable flinkRestExposedType: FlinkK8sRestExposedType):
KubernetesSubmitParam =
+ @Nullable flinkRestExposedType: FlinkK8sRestExposedType,
+ @Nullable ingressDefinition: IngressDef): KubernetesSubmitParam =
KubernetesSubmitParam(
clusterId = clusterId,
kubernetesNamespace = kubernetesNamespace,
kubernetesName = Option(kubernetesName),
baseImage = Some(baseImage),
- flinkRestExposedType = Option(flinkRestExposedType)
+ flinkRestExposedType = Option(flinkRestExposedType),
+ ingressDefinition = Option(ingressDefinition)
)
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
index 20f8d6aa5..a19368acd 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
@@ -98,6 +98,8 @@ object KubernetesApplicationClientV2 extends
KubernetesClientV2Trait with Logger
.filter(str => StringUtils.isNotBlank(str))
.getOrElse(return Left("Flink base image should not be empty"))
+ val ingress = submitReq.k8sSubmitParam.ingressDefinition
+
val imagePullPolicy = flinkConfObj
.getOption(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY)
.map(_.toString)
@@ -220,8 +222,6 @@ object KubernetesApplicationClientV2 extends
KubernetesClientV2Trait with Logger
result.toMap
}
- // TODO Migrate the construction logic of ingress to here and set it into
FlinkDeploymentDef.ingress
- // See:
org.apache.streampark.flink.packer.pipeline.impl.FlinkK8sApplicationBuildPipeline
Step-8
Right(
FlinkDeploymentDef(
namespace = namespace,
@@ -236,7 +236,8 @@ object KubernetesApplicationClientV2 extends
KubernetesClientV2Trait with Logger
logConfiguration = logConfiguration,
podTemplate = podTemplate,
job = Some(jobDef),
- extJarPaths = Array.empty
+ extJarPaths = Array.empty,
+ ingress = ingress
))
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
index be57adb41..ea4905d9c 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeApplicationClient.scala
@@ -32,6 +32,7 @@ import org.apache.flink.kubernetes.KubernetesClusterDescriptor
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
/** kubernetes native application mode submit */
+@deprecated("use KubernetesApplicationClientV2 instead")
object KubernetesNativeApplicationClient extends KubernetesNativeClientTrait {
@throws[Exception]
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 2daa1f68a..c4c9c6e41 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -41,6 +41,7 @@ import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
/** kubernetes native session mode submit */
+@deprecated("use KubernetesSessionClientV2 instead")
object KubernetesNativeSessionClient extends KubernetesNativeClientTrait with
Logger {
@throws[Exception]