This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.1
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.1 by this push:
new e67d07de3 yarn resourceManager URL improvement
e67d07de3 is described below
commit e67d07de399f6e93b0021e84043036ff8b301627
Author: benjobs <[email protected]>
AuthorDate: Wed Jun 14 14:58:58 2023 +0800
yarn resourceManager URL improvement
---
.../org/apache/streampark/common/util/YarnUtils.scala | 16 ++++++++++------
.../streampark/console/core/bean/AlertTemplate.java | 2 +-
.../streampark/console/core/entity/FlinkCluster.java | 2 +-
.../core/service/impl/FlinkClusterServiceImpl.java | 2 +-
.../console/core/service/alert/AlertServiceTest.java | 2 +-
5 files changed, 14 insertions(+), 10 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index 4c5985010..bf49e4082 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -128,9 +128,8 @@ object YarnUtils extends Logger {
* @return
* </pre>
*/
- def getRMWebAppURL(): String = {
-
- if (rmHttpURL == null) {
+ def getRMWebAppURL(getLatest: Boolean = false): String = {
+ if (rmHttpURL == null || getLatest) {
synchronized {
val conf = HadoopUtils.hadoopConf
val useHttps = YarnConfiguration.useHttps(conf)
@@ -290,9 +289,14 @@ object YarnUtils extends Logger {
}
}
- if (url.startsWith("http://") || url.startsWith("https://")) request(url)
- else {
- request(s"${getRMWebAppURL()}/$url")
+ url match {
+ case u if u.matches("^http(|s)://.*") => request(url)
+ case _ =>
+ val resp = request(s"${getRMWebAppURL()}/$url")
+ if (resp != null) resp;
+ else {
+ request(s"${getRMWebAppURL(true)}/$url")
+ }
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
index 774dd6a3e..fa6487707 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
@@ -60,7 +60,7 @@ public class AlertTemplate implements Serializable {
if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
String format = "%s/proxy/%s/";
- String url = String.format(format, YarnUtils.getRMWebAppURL(),
application.getAppId());
+ String url = String.format(format, YarnUtils.getRMWebAppURL(false),
application.getAppId());
template.setLink(url);
} else {
template.setLink(null);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index e7bacde14..dfc3985f2 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -166,7 +166,7 @@ public class FlinkCluster implements Serializable {
return false;
} else if (ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum()))
{
try {
- String restUrl = YarnUtils.getRMWebAppURL() + "/proxy/" +
this.clusterId + "/overview";
+ String restUrl = YarnUtils.getRMWebAppURL(true) + "/proxy/" +
this.clusterId + "/overview";
String result =
HttpClientUtils.httpGetRequest(
restUrl,
RequestConfig.custom().setConnectTimeout(2000).build());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index cfb9148ab..d4570bb2a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -187,7 +187,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
if (deployResponse != null) {
if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
String address =
- YarnUtils.getRMWebAppURL() + "/proxy/" +
deployResponse.clusterId() + "/";
+ YarnUtils.getRMWebAppURL(true) + "/proxy/" +
deployResponse.clusterId() + "/";
flinkCluster.setAddress(address);
} else {
flinkCluster.setAddress(deployResponse.address());
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
index d85ef7cbc..233bdb242 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
@@ -205,7 +205,7 @@ class AlertServiceTest {
duration = application.getEndTime().getTime() -
application.getStartTime().getTime();
}
String format = "%s/proxy/%s/";
- String url = String.format(format, YarnUtils.getRMWebAppURL(),
application.getAppId());
+ String url = String.format(format, YarnUtils.getRMWebAppURL(false),
application.getAppId());
AlertTemplate template = new AlertTemplate();
template.setJobName(application.getJobName());