This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new a9f687e2c [Improve] yarnUtil getRMUrl method improvement
a9f687e2c is described below
commit a9f687e2cb5a4191f1c8d16fee9d6d40f6da6b70
Author: benjobs <[email protected]>
AuthorDate: Sat Jan 13 11:44:50 2024 +0800
[Improve] yarnUtil getRMUrl method improvement
---
pom.xml | 3 +-
.../streampark/common/util/HttpClientUtils.scala | 15 ++
.../apache/streampark/common/util/YarnUtils.scala | 192 ++++++++-------------
.../streampark-console-service/pom.xml | 8 +-
4 files changed, 100 insertions(+), 118 deletions(-)
diff --git a/pom.xml b/pom.xml
index 34459254a..ff6275edb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -775,7 +775,8 @@
<exclude>.mvn/**</exclude>
<exclude>compiler/**</exclude>
<exclude>dist-material/**</exclude>
- <exclude>deploy/**</exclude>
+ <exclude>docker/**</exclude>
+ <exclude>helm/**</exclude>
<exclude>mvnw</exclude>
<exclude>mvnw.cmd</exclude>
<exclude>README.md</exclude>
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HttpClientUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HttpClientUtils.scala
index 5e2e7d5c5..604363475 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HttpClientUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HttpClientUtils.scala
@@ -21,6 +21,7 @@ import org.apache.http.auth.{AuthSchemeProvider, AuthScope,
Credentials}
import org.apache.http.client.config.{AuthSchemes, RequestConfig}
import org.apache.http.client.entity.UrlEncodedFormEntity
import org.apache.http.client.methods._
+import org.apache.http.client.protocol.HttpClientContext
import org.apache.http.client.utils.URIBuilder
import org.apache.http.config.RegistryBuilder
import org.apache.http.entity.StringEntity
@@ -35,6 +36,7 @@ import java.security.Principal
import java.util
import scala.collection.JavaConversions._
+import scala.util.Try
object HttpClientUtils {
@@ -204,4 +206,17 @@ object HttpClientUtils {
}
}
+ def tryCheckUrl(url: String, timeout: Int): Boolean = {
+ val httpClient = HttpClients.createDefault();
+ val context = HttpClientContext.create()
+ val httpGet = new HttpGet(url)
+ val requestConfig = RequestConfig
+ .custom()
+ .setSocketTimeout(timeout)
+ .setConnectTimeout(timeout)
+ .build()
+ httpGet.setConfig(requestConfig)
+ Try(httpClient.execute(httpGet, context)).isSuccess
+ }
+
}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index d20b7bb6e..9400f688f 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -26,20 +26,16 @@ import
org.apache.hadoop.yarn.api.records.YarnApplicationState._
import org.apache.hadoop.yarn.conf.{HAUtil, YarnConfiguration}
import org.apache.hadoop.yarn.util.{ConverterUtils, RMHAUtils}
import org.apache.http.client.config.RequestConfig
-import org.apache.http.client.methods.HttpGet
-import org.apache.http.client.protocol.HttpClientContext
-import org.apache.http.impl.client.HttpClients
import java.io.IOException
import java.net.InetAddress
import java.security.PrivilegedExceptionAction
import java.util
-import java.util.{HashMap => JavaHashMap, List => JavaList}
+import java.util.{List => JavaList}
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
-import scala.util.control.Breaks.{break, breakable}
object YarnUtils extends Logger {
@@ -130,126 +126,93 @@ object YarnUtils extends Logger {
* </pre>
*/
def getRMWebAppURL(getLatest: Boolean = false): String = {
- if (rmHttpURL == null || getLatest) {
- synchronized {
- val conf = HadoopUtils.hadoopConf
- val useHttps = YarnConfiguration.useHttps(conf)
- val (addressPrefix, defaultPort, protocol) = useHttps match {
- case x if x => (YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "8090",
"https://")
- case _ => (YarnConfiguration.RM_WEBAPP_ADDRESS, "8088", "http://")
- }
+ if (!getLatest && rmHttpURL != null) {
+ return rmHttpURL
+ }
- rmHttpURL = Option(conf.get("yarn.web-proxy.address", null)) match {
- case Some(proxy) => s"$protocol$proxy"
+ synchronized {
+ val conf = HadoopUtils.hadoopConf
+ val useHttps = YarnConfiguration.useHttps(conf)
+ val (webConfKey, defaultPort, protocol) = useHttps match {
+ case x if x => (YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "8090",
"https://")
+ case _ => (YarnConfiguration.RM_WEBAPP_ADDRESS, "8088", "http://")
+ }
+
+ def findActiveRMId(yarnConf: YarnConfiguration): String = {
+ Option(RMHAUtils.findActiveRMHAId(yarnConf)) match {
+ case Some(rmId) =>
+ logInfo("findActiveRMHAId successful")
+ rmId
case _ =>
- val name =
- if (!HAUtil.isHAEnabled(conf)) addressPrefix
- else {
- val yarnConf = new YarnConfiguration(conf)
- val activeRMId = {
- Option(RMHAUtils.findActiveRMHAId(yarnConf)) match {
- case Some(x) =>
- logInfo("findActiveRMHAId successful")
- x
- case None =>
- // if you don't know why, don't modify it
- logWarn(
- s"findActiveRMHAId is null,config
yarn.acl.enable:${yarnConf.get("yarn.acl.enable")},now http try it.")
- // url ==> rmId
- val idUrlMap = new JavaHashMap[String, String]
- val rmIds = HAUtil.getRMHAIds(conf)
- rmIds.foreach(
- id => {
- val address =
conf.get(HAUtil.addSuffix(addressPrefix, id)) match {
- case null =>
- val hostname =
-
conf.get(HAUtil.addSuffix("yarn.resourcemanager.hostname", id))
- s"$hostname:$defaultPort"
- case x => x
- }
- idUrlMap.put(s"$protocol$address", id)
- })
- var rmId: String = null
- val rpcTimeoutForChecks = yarnConf.getInt(
- CommonConfigurationKeys.HA_FC_CLI_CHECK_TIMEOUT_KEY,
-
CommonConfigurationKeys.HA_FC_CLI_CHECK_TIMEOUT_DEFAULT)
- breakable(
- idUrlMap.foreach(
- x => {
- // test yarn url
- val activeUrl = httpTestYarnRMUrl(x._1,
rpcTimeoutForChecks)
- if (activeUrl != null) {
- rmId = idUrlMap(activeUrl)
- break
- }
- }))
- rmId
- }
- }
- require(
- activeRMId != null,
- "[StreamPark] YarnUtils.getRMWebAppURL: can not found yarn
active node")
- logInfo(s"current activeRMHAId: $activeRMId")
- val appActiveRMKey = HAUtil.addSuffix(addressPrefix,
activeRMId)
- val hostnameActiveRMKey =
- HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, activeRMId)
- if (
- null == HAUtil.getConfValueForRMInstance(
- appActiveRMKey,
- yarnConf) && null != HAUtil.getConfValueForRMInstance(
- hostnameActiveRMKey,
- yarnConf)
- ) {
- logInfo(s"Find rm web address by : $hostnameActiveRMKey")
- hostnameActiveRMKey
- } else {
- logInfo(s"Find rm web address by : $appActiveRMKey")
- appActiveRMKey
- }
+ // if you don't know why, don't modify it
+ logWarn(
+ s"findActiveRMHAId is null, config
yarn.acl.enable:${yarnConf.get("yarn.acl.enable")},now http try it."
+ )
+
+ val rpcTimeoutForChecks = yarnConf.getInt(
+ CommonConfigurationKeys.HA_FC_CLI_CHECK_TIMEOUT_KEY,
+ CommonConfigurationKeys.HA_FC_CLI_CHECK_TIMEOUT_DEFAULT
+ )
+
+ HAUtil
+ .getRMHAIds(conf)
+ .find {
+ id =>
+ val url =
+ protocol + Option(conf.get(HAUtil.addSuffix(webConfKey,
id))).getOrElse {
+ val hostname =
conf.get(HAUtil.addSuffix("yarn.resourcemanager.hostname", id))
+ s"$hostname:$defaultPort"
+ }
+ // try check yarn url
+ HttpClientUtils.tryCheckUrl(url, rpcTimeoutForChecks)
}
+ .getOrElse(throw new IOException(
+ "[StreamPark] YarnUtils.getRMWebAppURL: can not found yarn
active node"))
+ }
+ }
- val inetSocketAddress =
- conf.getSocketAddr(name, s"0.0.0.0:$defaultPort",
defaultPort.toInt)
+ def getAddressConfKey: String = {
+ val yarnConf = new YarnConfiguration(conf)
+ val activeRMId = findActiveRMId(yarnConf)
+ logInfo(s"current activeRMHAId: $activeRMId")
+ val activeRMKey = HAUtil.addSuffix(webConfKey, activeRMId)
+ val hostRMKey = HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME,
activeRMId)
+ (activeRMKey, hostRMKey) match {
+ case (key, host)
+ if HAUtil.getConfValueForRMInstance(key, yarnConf) == null &&
+ HAUtil.getConfValueForRMInstance(host, yarnConf) != null =>
+ logInfo(s"Find rm web address by : $host")
+ host
+ case (key, _) =>
+ logInfo(s"Find rm web address by : $key")
+ key
+ }
+ }
- val address = NetUtils.getConnectAddress(inetSocketAddress)
+ rmHttpURL = protocol + Option(conf.get("yarn.web-proxy.address",
null)).getOrElse {
+ val addrKey = if (!HAUtil.isHAEnabled(conf)) webConfKey else
getAddressConfKey
+ val socketAddr = conf.getSocketAddr(addrKey, s"0.0.0.0:$defaultPort",
defaultPort.toInt)
+ val address = NetUtils.getConnectAddress(socketAddr)
+ val resolved = address.getAddress
- val buffer = new StringBuilder(protocol)
- val resolved = address.getAddress
- if (resolved != null && !resolved.isAnyLocalAddress &&
!resolved.isLoopbackAddress) {
- buffer.append(address.getHostName)
- } else {
- Try(InetAddress.getLocalHost.getCanonicalHostName) match {
- case Success(value) => buffer.append(value)
- case _ => buffer.append(address.getHostName)
- }
+ val hostName = {
+ if (resolved != null && !resolved.isAnyLocalAddress &&
!resolved.isLoopbackAddress) {
+ address.getHostName
+ } else {
+ Try(InetAddress.getLocalHost.getCanonicalHostName) match {
+ case Success(value) => value
+ case _ => address.getHostName
}
- buffer
- .append(":")
- .append(address.getPort)
- .toString()
+ }
}
- logInfo(s"yarn resourceManager webapp url:$rmHttpURL")
+
+ s"$hostName:${address.getPort}"
}
+ logInfo(s"yarn resourceManager webapp url:$rmHttpURL")
}
rmHttpURL
}
- private[this] def httpTestYarnRMUrl(url: String, timeout: Int): String = {
- val httpClient = HttpClients.createDefault();
- val context = HttpClientContext.create()
- val httpGet = new HttpGet(url)
- val requestConfig = RequestConfig
- .custom()
- .setSocketTimeout(timeout)
- .setConnectTimeout(timeout)
- .build()
- httpGet.setConfig(requestConfig)
- Try(httpClient.execute(httpGet, context)) match {
- case Success(_) => context.getTargetHost.toString
- case _ => null
- }
- }
-
def getYarnAppTrackingUrl(applicationId: ApplicationId): String =
HadoopUtils.yarnClient.getApplicationReport(applicationId).getTrackingUrl
@@ -299,10 +262,7 @@ object YarnUtils extends Logger {
})
} else {
val url =
- if (!hasYarnHttpSimpleAuth) reqUrl
- else {
- s"$reqUrl?user.name=${HadoopUtils.hadoopUserName}"
- }
+ if (!hasYarnHttpSimpleAuth) reqUrl else
s"$reqUrl?user.name=${HadoopUtils.hadoopUserName}"
HttpClientUtils.httpGetRequest(url, config)
}
}
diff --git a/streampark-console/streampark-console-service/pom.xml
b/streampark-console/streampark-console-service/pom.xml
index fa518b300..b53096058 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -54,7 +54,7 @@
<lombok.version>1.18.24</lombok.version>
<xml-apis.version>1.4.01</xml-apis.version>
<ivy.version>2.5.0</ivy.version>
- <eclipse.jgit.version>5.13.1.202206130422-r</eclipse.jgit.version>
+ <eclipse.jgit.version>5.13.3.202401111512-r</eclipse.jgit.version>
</properties>
<dependencyManagement>
@@ -236,6 +236,12 @@
<scope>runtime</scope>
</dependency>
+ <dependency>
+ <groupId>com.mysql</groupId>
+ <artifactId>mysql-connector-j</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
<!-- postgresql -->
<dependency>
<groupId>org.postgresql</groupId>