This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.4 by this push:
new 5626ac87b Fix Issue #3638 : Flink native k8s application task with
INITIALIZING status (#3639)
5626ac87b is described below
commit 5626ac87bb093430441c3ec6e017a9058f31f82c
Author: Yuze <[email protected]>
AuthorDate: Fri Mar 29 11:09:44 2024 +0800
Fix Issue #3638 : Flink native k8s application task with INITIALIZING
status (#3639)
Co-authored-by: ze.yu <[email protected]>
---
.../flink/kubernetes/KubernetesRetriever.scala | 19 +++++-------
.../kubernetes/ingress/IngressController.scala | 6 ++--
.../flink/kubernetes/ingress/IngressStrategy.scala | 5 ++-
.../kubernetes/ingress/IngressStrategyV1.scala | 36 ++++++++++++++++------
.../ingress/IngressStrategyV1beta1.scala | 10 +++---
5 files changed, 47 insertions(+), 29 deletions(-)
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index f57060795..c93a84923 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -136,17 +136,14 @@ object KubernetesRetriever extends Logger {
/** retrieve flink jobManager rest url */
def retrieveFlinkRestUrl(clusterKey: ClusterKey): Option[String] = {
- Utils.using(
- KubernetesRetriever
- .newFinkClusterClient(clusterKey.clusterId, clusterKey.namespace,
clusterKey.executeMode)
- .getOrElse(return None)) {
- client =>
- val url =
- IngressController.getIngressUrl(clusterKey.namespace,
clusterKey.clusterId, client)
- logger.info(s"retrieve flink jobManager rest url: $url")
- client.close()
- Some(url)
- }
+ val url =
+ IngressController.getIngressUrl(clusterKey.namespace,
clusterKey.clusterId) {
+ KubernetesRetriever
+ .newFinkClusterClient(clusterKey.clusterId, clusterKey.namespace,
clusterKey.executeMode)
+ .getOrElse(return None)
+ }
+ logger.info(s"retrieve flink jobManager rest url: $url")
+ Some(url)
}
def getSessionClusterIngressURL(namespace: String, clusterId: String):
String = {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
index 8bf576329..9394bcd6a 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
@@ -47,9 +47,9 @@ object IngressController extends Logger {
def getIngressUrl(
nameSpace: String,
- clusterId: String,
- clusterClient: ClusterClient[_]): String = {
- ingressStrategy.getIngressUrl(nameSpace, clusterId, clusterClient)
+ clusterId: String
+ )(clusterClient: => ClusterClient[_]): String = {
+ ingressStrategy.getIngressUrl(nameSpace, clusterId)(clusterClient)
}
def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates:
String): String = {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
index ade4c8cae..0d2e863f5 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
@@ -28,9 +28,12 @@ import java.io.File
trait IngressStrategy {
+ val REST_SERVICE_IDENTIFICATION = "rest"
+
lazy val ingressClass: String =
InternalConfigHolder.get[String](K8sFlinkConfig.ingressClass)
- def getIngressUrl(nameSpace: String, clusterId: String, clusterClient:
ClusterClient[_]): String
+ def getIngressUrl(nameSpace: String, clusterId: String)(
+ clusterClient: => ClusterClient[_]): String
def configureIngress(domainName: String, clusterId: String, nameSpace:
String): Unit
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
index 9e08338e9..1ac15bfdc 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
@@ -29,10 +29,8 @@ import scala.util.Try
class IngressStrategyV1 extends IngressStrategy {
- override def getIngressUrl(
- nameSpace: String,
- clusterId: String,
- clusterClient: ClusterClient[_]): String = {
+ override def getIngressUrl(nameSpace: String, clusterId: String)(
+ clusterClient: => ClusterClient[_]): String = {
Utils.using(new DefaultKubernetesClient) {
client =>
@@ -41,7 +39,9 @@ class IngressStrategyV1 extends IngressStrategy {
.map(ingress => ingress.getSpec.getRules.get(0))
.map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
.map { case (host, path) => s"http://$host$path" }
- .getOrElse(clusterClient.getWebInterfaceURL)
+ .getOrElse {
+ Utils.using(clusterClient)(client => client.getWebInterfaceURL)
+ }
}.recover {
case e =>
throw new RuntimeException(s"[StreamPark] get ingressUrlAddress
error: $e")
@@ -49,10 +49,28 @@ class IngressStrategyV1 extends IngressStrategy {
}
}
+ private[this] def touchIngressBackendRestPort(
+ client: DefaultKubernetesClient,
+ clusterId: String,
+ nameSpace: String): Int = {
+ var ports = client.services
+ .inNamespace(nameSpace)
+ .withName(s"$clusterId-$REST_SERVICE_IDENTIFICATION")
+ .get()
+ .getSpec
+ .getPorts
+ .asScala
+ ports =
+ ports.filter(servicePort =>
servicePort.getName.equalsIgnoreCase(REST_SERVICE_IDENTIFICATION))
+ ports.map(servicePort => servicePort.getTargetPort.getIntVal).head
+ }
+
override def configureIngress(domainName: String, clusterId: String,
nameSpace: String): Unit = {
Utils.using(new DefaultKubernetesClient) {
client =>
val ownerReference = getOwnerReference(nameSpace, clusterId, client)
+ val ingressBackendRestServicePort =
+ touchIngressBackendRestPort(client, clusterId, nameSpace)
val ingress = new IngressBuilder()
.withNewMetadata()
.withName(clusterId)
@@ -70,9 +88,9 @@ class IngressStrategyV1 extends IngressStrategy {
.withPathType("ImplementationSpecific")
.withNewBackend()
.withNewService()
- .withName(s"$clusterId-rest")
+ .withName(s"$clusterId-$REST_SERVICE_IDENTIFICATION")
.withNewPort()
- .withName("rest")
+ .withNumber(ingressBackendRestServicePort)
.endPort()
.endService()
.endBackend()
@@ -82,9 +100,9 @@ class IngressStrategyV1 extends IngressStrategy {
.withPathType("ImplementationSpecific")
.withNewBackend()
.withNewService()
- .withName(s"$clusterId-rest")
+ .withName(s"$clusterId-$REST_SERVICE_IDENTIFICATION")
.withNewPort()
- .withName("rest")
+ .withNumber(ingressBackendRestServicePort)
.endPort()
.endService()
.endBackend()
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
index 1cfeceb68..408210a0c 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
@@ -31,10 +31,8 @@ import scala.util.Try
class IngressStrategyV1beta1 extends IngressStrategy {
- override def getIngressUrl(
- nameSpace: String,
- clusterId: String,
- clusterClient: ClusterClient[_]): String = {
+ override def getIngressUrl(nameSpace: String, clusterId: String)(
+ clusterClient: => ClusterClient[_]): String = {
Utils.using(new DefaultKubernetesClient) {
client =>
@@ -43,7 +41,9 @@ class IngressStrategyV1beta1 extends IngressStrategy {
.map(ingress => ingress.getSpec.getRules.get(0))
.map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
.map { case (host, path) => s"http://$host$path" }
- .getOrElse(clusterClient.getWebInterfaceURL)
+ .getOrElse {
+ Utils.using(clusterClient)(client => client.getWebInterfaceURL)
+ }
}.recover {
case e =>
throw new RuntimeException(s"[StreamPark] get ingressUrlAddress
error: $e")