This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new 7f134002d [Improve] ingressUrl improvement
7f134002d is described below
commit 7f134002d9db6fa50549f70485612914a69452ef
Author: benjobs <[email protected]>
AuthorDate: Tue Jan 30 14:08:44 2024 +0800
[Improve] ingressUrl improvement
---
.../core/service/impl/ApplicationServiceImpl.java | 3 ++-
.../core/service/impl/FlinkClusterServiceImpl.java | 19 +++++++++++--------
.../client/impl/KubernetesNativeSessionClient.scala | 11 +++--------
.../client/trait/KubernetesNativeClientTrait.scala | 7 -------
.../flink/kubernetes/KubernetesRetriever.scala | 5 +++++
.../flink/kubernetes/ingress/IngressController.scala | 2 +-
6 files changed, 22 insertions(+), 25 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 6c1fc29da..bd189cdbe 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1421,6 +1421,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
@Override
public void starting(Application application) {
application.setState(FlinkAppState.STARTING.getValue());
+ application.setJobManagerUrl(null);
application.setOptionTime(new Date());
updateById(application);
}
@@ -1658,7 +1659,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
application.setJobId(response.jobId());
}
- if (StringUtils.isNoneEmpty(response.jobManagerUrl())) {
+ if (StringUtils.isNotEmpty(response.jobManagerUrl())) {
application.setJobManagerUrl(response.jobManagerUrl());
applicationLog.setJobManagerUrl(response.jobManagerUrl());
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 88b648728..cc04a5f5d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -32,6 +32,7 @@ import
org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.ServiceHelper;
+import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.YarnQueueService;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.DeployRequest;
@@ -39,8 +40,6 @@ import org.apache.streampark.flink.client.bean.DeployResponse;
import org.apache.streampark.flink.client.bean.KubernetesDeployRequest;
import org.apache.streampark.flink.client.bean.ShutDownResponse;
import org.apache.streampark.flink.kubernetes.KubernetesRetriever;
-import org.apache.streampark.flink.kubernetes.model.ClusterKey;
-import org.apache.streampark.flink.kubernetes.model.TrackId;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -92,6 +91,8 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
@Autowired private YarnQueueService yarnQueueService;
+ @Autowired private SettingService settingService;
+
@Override
public ResponseResult check(FlinkCluster cluster) {
ResponseResult result = new ResponseResult();
@@ -364,12 +365,14 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
List<FlinkCluster> clusters = list();
for (FlinkCluster cluster : clusters) {
if
(ExecutionMode.KUBERNETES_NATIVE_SESSION.equals(cluster.getExecutionModeEnum()))
{
- cluster.setAddress(
- KubernetesRetriever.retrieveFlinkRestUrl(
- ClusterKey.of(
- TrackId.onSession(
- cluster.getK8sNamespace(), cluster.getClusterId(),
0L, null, null)))
- .getOrElse(cluster::getAddress));
+ if (StringUtils.isNotBlank(settingService.getIngressModeDefault())) {
+ String namespace = cluster.getK8sNamespace();
+ String clusterId = cluster.getClusterId();
+ String ingressUrl =
KubernetesRetriever.getSessionClusterIngressURL(namespace, clusterId);
+ if (ingressUrl != null) {
+ cluster.setAddress(ingressUrl);
+ }
+ }
}
}
return clusters;
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 54bfe210e..c9f7cd2a5 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -109,9 +109,8 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
.getClusterClient
val submitResult = client.submitJob(jobGraph)
val jobId = submitResult.get().toString
- val url = getClientURL(client, submitRequest.kubernetesNamespace,
submitRequest.clusterId)
val result =
- SubmitResponse(client.getClusterId, flinkConfig.toMap, jobId, url)
+ SubmitResponse(client.getClusterId, flinkConfig.toMap, jobId,
client.getWebInterfaceURL)
logInfo(
s"[flink-submit] flink job has been submitted.
${flinkConfIdentifierInfo(flinkConfig)}, jobId: $jobId")
result
@@ -170,12 +169,7 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
client =
clusterDescriptor.deploySessionCluster(kubernetesClusterDescriptor._2).getClusterClient
}
- val url = getClientURL(client, deployRequest.kubernetesNamespace,
deployRequest.clusterId)
- if (url != null) {
- DeployResponse(address = url, clusterId = client.getClusterId)
- } else {
- DeployResponse(error = new RuntimeException("get the cluster
getWebInterfaceURL failed."))
- }
+ DeployResponse(address = client.getWebInterfaceURL, clusterId =
client.getClusterId)
} catch {
case e: Exception => DeployResponse(error = e)
} finally {
@@ -239,4 +233,5 @@ object KubernetesNativeSessionClient extends
KubernetesNativeClientTrait with Lo
flinkConf.safeSet(DeploymentOptions.TARGET,
ExecutionMode.KUBERNETES_NATIVE_SESSION.getName)
super.doTriggerSavepoint(request, flinkConf)
}
+
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
index 36bb67b91..ec3134502 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/KubernetesNativeClientTrait.scala
@@ -19,7 +19,6 @@ package org.apache.streampark.flink.client.`trait`
import org.apache.streampark.common.enums.{ExecutionMode,
FlinkK8sRestExposedType}
import org.apache.streampark.flink.client.bean._
-import org.apache.streampark.flink.kubernetes.ingress.IngressController
import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse
import org.apache.commons.lang3.StringUtils
@@ -34,7 +33,6 @@ import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.Service
import javax.annotation.Nonnull
import scala.language.postfixOps
-import scala.util.Try
/** kubernetes native mode submit */
trait KubernetesNativeClientTrait extends FlinkClientTrait {
@@ -185,9 +183,4 @@ trait KubernetesNativeClientTrait extends FlinkClientTrait {
else homePath.concat("/.kube/config")
}
- def getClientURL(client: ClusterClient[_], namespace: String, clusterId:
String): String = {
- Try(IngressController.getIngressUrl(namespace, clusterId, client))
- .getOrElse(client.getWebInterfaceURL)
- }
-
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index dd308fb96..fac9213b6 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -148,4 +148,9 @@ object KubernetesRetriever extends Logger {
Some(url)
}
}
+
+ def getSessionClusterIngressURL(namespace: String, clusterId: String):
String = {
+ retrieveFlinkRestUrl(ClusterKey(FlinkK8sExecuteMode.SESSION, namespace,
clusterId)).orNull
+ }
+
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
index d6597ef19..8bf576329 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
@@ -29,7 +29,7 @@ object IngressController extends Logger {
private[this] val VERSION_REGEXP = "(\\d+\\.\\d+)".r
- lazy val clusterVersion = using(new DefaultKubernetesClient()) {
+ private lazy val clusterVersion = using(new DefaultKubernetesClient()) {
client =>
VERSION_REGEXP.findFirstIn(client.getVersion.getGitVersion).get.toDouble
}