This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new 32e38f166 [Improve] ingress improvement
32e38f166 is described below
commit 32e38f1669fe265d5bcf86140943ded26c7f90af
Author: benjobs <[email protected]>
AuthorDate: Wed Jan 24 21:53:24 2024 +0800
[Improve] ingress improvement
---
.../flink/client/impl/KubernetesNativeSessionClient.scala | 10 ++++++++--
.../streampark/flink/client/impl/YarnSessionClient.scala | 8 +++++++-
.../streampark/flink/client/trait/FlinkClientTrait.scala | 7 -------
.../flink/client/trait/KubernetesNativeClientTrait.scala | 8 ++++++++
.../streampark/flink/kubernetes/KubernetesRetriever.scala | 5 +----
.../flink/kubernetes/ingress/IngressController.scala | 4 ++--
.../streampark/flink/kubernetes/ingress/IngressStrategy.scala | 11 ++++-------
.../flink/kubernetes/ingress/IngressStrategyV1.scala | 2 +-
.../flink/kubernetes/ingress/IngressStrategyV1beta1.scala | 2 +-
9 files changed, 32 insertions(+), 25 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 3606c03a0..54bfe210e 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -109,8 +109,9 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
.getClusterClient
val submitResult = client.submitJob(jobGraph)
val jobId = submitResult.get().toString
+ val url = getClientURL(client, submitRequest.kubernetesNamespace,
submitRequest.clusterId)
val result =
- SubmitResponse(client.getClusterId, flinkConfig.toMap, jobId,
client.getWebInterfaceURL)
+ SubmitResponse(client.getClusterId, flinkConfig.toMap, jobId, url)
logInfo(
s"[flink-submit] flink job has been submitted.
${flinkConfIdentifierInfo(flinkConfig)}, jobId: $jobId")
result
@@ -169,7 +170,12 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
client =
clusterDescriptor.deploySessionCluster(kubernetesClusterDescriptor._2).getClusterClient
}
- getDeployResponse(client)
+ val url = getClientURL(client, deployRequest.kubernetesNamespace,
deployRequest.clusterId)
+ if (url != null) {
+ DeployResponse(address = url, clusterId = client.getClusterId)
+ } else {
+ DeployResponse(error = new RuntimeException("get the cluster
getWebInterfaceURL failed."))
+ }
} catch {
case e: Exception => DeployResponse(error = e)
} finally {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index 0e0b4b3c9..e901a30c1 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -198,7 +198,13 @@ object YarnSessionClient extends YarnClientTrait {
}
val clientProvider =
clusterDescriptor.deploySessionCluster(yarnClusterDescriptor._1)
client = clientProvider.getClusterClient
- getDeployResponse(client)
+ if (client.getWebInterfaceURL != null) {
+ DeployResponse(
+ address = client.getWebInterfaceURL,
+ clusterId = client.getClusterId.toString)
+ } else {
+ DeployResponse(error = new RuntimeException("get the cluster
getWebInterfaceURL failed."))
+ }
} catch {
case e: Exception => DeployResponse(error = e)
} finally {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 09b4ed8ad..f1b171310 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -524,11 +524,4 @@ trait FlinkClientTrait extends Logger {
clientWrapper.triggerSavepoint(jobID, savepointPath).get()
}
- def getDeployResponse(client: ClusterClient[_]): DeployResponse = {
- if (client.getWebInterfaceURL != null) {
- DeployResponse(address = client.getWebInterfaceURL, clusterId =
client.getClusterId.toString)
- } else {
- DeployResponse(error = new RuntimeException("get the cluster
getWebInterfaceURL failed."))
- }
- }
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index 9e7468053..36bb67b91 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -19,6 +19,7 @@ package org.apache.streampark.flink.client.`trait`
import org.apache.streampark.common.enums.{ExecutionMode,
FlinkK8sRestExposedType}
import org.apache.streampark.flink.client.bean._
+import org.apache.streampark.flink.kubernetes.ingress.IngressController
import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse
import org.apache.commons.lang3.StringUtils
@@ -33,6 +34,7 @@ import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.Service
import javax.annotation.Nonnull
import scala.language.postfixOps
+import scala.util.Try
/** kubernetes native mode submit */
trait KubernetesNativeClientTrait extends FlinkClientTrait {
@@ -182,4 +184,10 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait
{
if (k8sConf != null) k8sConf.replace("~", homePath)
else homePath.concat("/.kube/config")
}
+
+ def getClientURL(client: ClusterClient[_], namespace: String, clusterId:
String): String = {
+ Try(IngressController.getIngressUrl(namespace, clusterId, client))
+ .getOrElse(client.getWebInterfaceURL)
+ }
+
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index d2e3a8236..dd308fb96 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -34,10 +34,7 @@ import org.apache.hc.core5.util.Timeout
import javax.annotation.Nullable
-import java.time.Duration
-
import scala.collection.JavaConverters._
-import scala.util
import scala.util.{Failure, Success, Try}
object KubernetesRetriever extends Logger {
@@ -145,7 +142,7 @@ object KubernetesRetriever extends Logger {
.getOrElse(return None)) {
client =>
val url =
- IngressController.ingressUrlAddress(clusterKey.namespace,
clusterKey.clusterId, client)
+ IngressController.getIngressUrl(clusterKey.namespace,
clusterKey.clusterId, client)
logger.info(s"retrieve flink jobManager rest url: $url")
client.close()
Some(url)
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
index d443a22bd..7e82f996a 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
@@ -45,11 +45,11 @@ object IngressController extends Logger {
ingressStrategy.configureIngress(domainName, clusterId, nameSpace)
}
- def ingressUrlAddress(
+ def getIngressUrl(
nameSpace: String,
clusterId: String,
clusterClient: ClusterClient[_]): String = {
- ingressStrategy.ingressUrlAddress(nameSpace, clusterId, clusterClient)
+ ingressStrategy.getIngressUrl(nameSpace, clusterId, clusterClient)
}
def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates:
String): String = {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
index a049c12a4..b44c216f8 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.kubernetes.ingress
-import org.apache.streampark.common.conf.{ConfigConst, InternalConfigHolder,
K8sFlinkConfig}
+import org.apache.streampark.common.conf.{InternalConfigHolder, K8sFlinkConfig}
import io.fabric8.kubernetes.api.model.{OwnerReference, OwnerReferenceBuilder}
import io.fabric8.kubernetes.client.DefaultKubernetesClient
@@ -28,12 +28,9 @@ import java.io.File
trait IngressStrategy {
- lazy val ingressClass =
InternalConfigHolder.get[String](K8sFlinkConfig.ingressClass)
+ lazy val ingressClass: String =
InternalConfigHolder.get[String](K8sFlinkConfig.ingressClass)
- def ingressUrlAddress(
- nameSpace: String,
- clusterId: String,
- clusterClient: ClusterClient[_]): String
+ def getIngressUrl(nameSpace: String, clusterId: String, clusterClient:
ClusterClient[_]): String
def configureIngress(domainName: String, clusterId: String, nameSpace:
String): Unit
@@ -50,7 +47,7 @@ trait IngressStrategy {
}
def buildIngressAnnotations(clusterId: String, namespace: String):
Map[String, String] = {
- var annotations = Map(
+ val annotations = Map(
"kubernetes.io/ingress.class" -> ingressClass,
"nginx.ingress.kubernetes.io/rewrite-target" -> "/$2",
"nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m",
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
index a4e08e49b..d7fd5d37e 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
@@ -29,7 +29,7 @@ import scala.util.Try
class IngressStrategyV1 extends IngressStrategy {
- override def ingressUrlAddress(
+ override def getIngressUrl(
nameSpace: String,
clusterId: String,
clusterClient: ClusterClient[_]): String = {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
index f143f073a..878f13b3e 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
@@ -30,7 +30,7 @@ import scala.util.Try
class IngressStrategyV1beta1 extends IngressStrategy {
- override def ingressUrlAddress(
+ override def getIngressUrl(
nameSpace: String,
clusterId: String,
clusterClient: ClusterClient[_]): String = {