This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new a9f687e2c [Improve] yarnUtil getRMUrl method improvement
a9f687e2c is described below

commit a9f687e2cb5a4191f1c8d16fee9d6d40f6da6b70
Author: benjobs <[email protected]>
AuthorDate: Sat Jan 13 11:44:50 2024 +0800

    [Improve] yarnUtil getRMUrl method improvement
---
 pom.xml                                            |   3 +-
 .../streampark/common/util/HttpClientUtils.scala   |  15 ++
 .../apache/streampark/common/util/YarnUtils.scala  | 192 ++++++++-------------
 .../streampark-console-service/pom.xml             |   8 +-
 4 files changed, 100 insertions(+), 118 deletions(-)

diff --git a/pom.xml b/pom.xml
index 34459254a..ff6275edb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -775,7 +775,8 @@
                             <exclude>.mvn/**</exclude>
                             <exclude>compiler/**</exclude>
                             <exclude>dist-material/**</exclude>
-                            <exclude>deploy/**</exclude>
+                            <exclude>docker/**</exclude>
+                            <exclude>helm/**</exclude>
                             <exclude>mvnw</exclude>
                             <exclude>mvnw.cmd</exclude>
                             <exclude>README.md</exclude>
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HttpClientUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HttpClientUtils.scala
index 5e2e7d5c5..604363475 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HttpClientUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HttpClientUtils.scala
@@ -21,6 +21,7 @@ import org.apache.http.auth.{AuthSchemeProvider, AuthScope, 
Credentials}
 import org.apache.http.client.config.{AuthSchemes, RequestConfig}
 import org.apache.http.client.entity.UrlEncodedFormEntity
 import org.apache.http.client.methods._
+import org.apache.http.client.protocol.HttpClientContext
 import org.apache.http.client.utils.URIBuilder
 import org.apache.http.config.RegistryBuilder
 import org.apache.http.entity.StringEntity
@@ -35,6 +36,7 @@ import java.security.Principal
 import java.util
 
 import scala.collection.JavaConversions._
+import scala.util.Try
 
 object HttpClientUtils {
 
@@ -204,4 +206,17 @@ object HttpClientUtils {
     }
   }
 
+  def tryCheckUrl(url: String, timeout: Int): Boolean = {
+    val httpClient = HttpClients.createDefault();
+    val context = HttpClientContext.create()
+    val httpGet = new HttpGet(url)
+    val requestConfig = RequestConfig
+      .custom()
+      .setSocketTimeout(timeout)
+      .setConnectTimeout(timeout)
+      .build()
+    httpGet.setConfig(requestConfig)
+    Try(httpClient.execute(httpGet, context)).isSuccess
+  }
+
 }
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index d20b7bb6e..9400f688f 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -26,20 +26,16 @@ import 
org.apache.hadoop.yarn.api.records.YarnApplicationState._
 import org.apache.hadoop.yarn.conf.{HAUtil, YarnConfiguration}
 import org.apache.hadoop.yarn.util.{ConverterUtils, RMHAUtils}
 import org.apache.http.client.config.RequestConfig
-import org.apache.http.client.methods.HttpGet
-import org.apache.http.client.protocol.HttpClientContext
-import org.apache.http.impl.client.HttpClients
 
 import java.io.IOException
 import java.net.InetAddress
 import java.security.PrivilegedExceptionAction
 import java.util
-import java.util.{HashMap => JavaHashMap, List => JavaList}
+import java.util.{List => JavaList}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success, Try}
-import scala.util.control.Breaks.{break, breakable}
 
 object YarnUtils extends Logger {
 
@@ -130,126 +126,93 @@ object YarnUtils extends Logger {
    *   </pre>
    */
   def getRMWebAppURL(getLatest: Boolean = false): String = {
-    if (rmHttpURL == null || getLatest) {
-      synchronized {
-        val conf = HadoopUtils.hadoopConf
-        val useHttps = YarnConfiguration.useHttps(conf)
-        val (addressPrefix, defaultPort, protocol) = useHttps match {
-          case x if x => (YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "8090", 
"https://";)
-          case _ => (YarnConfiguration.RM_WEBAPP_ADDRESS, "8088", "http://";)
-        }
+    if (!getLatest && rmHttpURL != null) {
+      return rmHttpURL
+    }
 
-        rmHttpURL = Option(conf.get("yarn.web-proxy.address", null)) match {
-          case Some(proxy) => s"$protocol$proxy"
+    synchronized {
+      val conf = HadoopUtils.hadoopConf
+      val useHttps = YarnConfiguration.useHttps(conf)
+      val (webConfKey, defaultPort, protocol) = useHttps match {
+        case x if x => (YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "8090", 
"https://";)
+        case _ => (YarnConfiguration.RM_WEBAPP_ADDRESS, "8088", "http://";)
+      }
+
+      def findActiveRMId(yarnConf: YarnConfiguration): String = {
+        Option(RMHAUtils.findActiveRMHAId(yarnConf)) match {
+          case Some(rmId) =>
+            logInfo("findActiveRMHAId successful")
+            rmId
           case _ =>
-            val name =
-              if (!HAUtil.isHAEnabled(conf)) addressPrefix
-              else {
-                val yarnConf = new YarnConfiguration(conf)
-                val activeRMId = {
-                  Option(RMHAUtils.findActiveRMHAId(yarnConf)) match {
-                    case Some(x) =>
-                      logInfo("findActiveRMHAId successful")
-                      x
-                    case None =>
-                      // if you don't know why, don't modify it
-                      logWarn(
-                        s"findActiveRMHAId is null,config 
yarn.acl.enable:${yarnConf.get("yarn.acl.enable")},now http try it.")
-                      // url ==> rmId
-                      val idUrlMap = new JavaHashMap[String, String]
-                      val rmIds = HAUtil.getRMHAIds(conf)
-                      rmIds.foreach(
-                        id => {
-                          val address = 
conf.get(HAUtil.addSuffix(addressPrefix, id)) match {
-                            case null =>
-                              val hostname =
-                                
conf.get(HAUtil.addSuffix("yarn.resourcemanager.hostname", id))
-                              s"$hostname:$defaultPort"
-                            case x => x
-                          }
-                          idUrlMap.put(s"$protocol$address", id)
-                        })
-                      var rmId: String = null
-                      val rpcTimeoutForChecks = yarnConf.getInt(
-                        CommonConfigurationKeys.HA_FC_CLI_CHECK_TIMEOUT_KEY,
-                        
CommonConfigurationKeys.HA_FC_CLI_CHECK_TIMEOUT_DEFAULT)
-                      breakable(
-                        idUrlMap.foreach(
-                          x => {
-                            // test yarn url
-                            val activeUrl = httpTestYarnRMUrl(x._1, 
rpcTimeoutForChecks)
-                            if (activeUrl != null) {
-                              rmId = idUrlMap(activeUrl)
-                              break
-                            }
-                          }))
-                      rmId
-                  }
-                }
-                require(
-                  activeRMId != null,
-                  "[StreamPark] YarnUtils.getRMWebAppURL: can not found yarn 
active node")
-                logInfo(s"current activeRMHAId: $activeRMId")
-                val appActiveRMKey = HAUtil.addSuffix(addressPrefix, 
activeRMId)
-                val hostnameActiveRMKey =
-                  HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, activeRMId)
-                if (
-                  null == HAUtil.getConfValueForRMInstance(
-                    appActiveRMKey,
-                    yarnConf) && null != HAUtil.getConfValueForRMInstance(
-                    hostnameActiveRMKey,
-                    yarnConf)
-                ) {
-                  logInfo(s"Find rm web address by : $hostnameActiveRMKey")
-                  hostnameActiveRMKey
-                } else {
-                  logInfo(s"Find rm web address by : $appActiveRMKey")
-                  appActiveRMKey
-                }
+            // if you don't know why, don't modify it
+            logWarn(
+              s"findActiveRMHAId is null, config 
yarn.acl.enable:${yarnConf.get("yarn.acl.enable")},now http try it."
+            )
+
+            val rpcTimeoutForChecks = yarnConf.getInt(
+              CommonConfigurationKeys.HA_FC_CLI_CHECK_TIMEOUT_KEY,
+              CommonConfigurationKeys.HA_FC_CLI_CHECK_TIMEOUT_DEFAULT
+            )
+
+            HAUtil
+              .getRMHAIds(conf)
+              .find {
+                id =>
+                  val url =
+                    protocol + Option(conf.get(HAUtil.addSuffix(webConfKey, 
id))).getOrElse {
+                      val hostname = 
conf.get(HAUtil.addSuffix("yarn.resourcemanager.hostname", id))
+                      s"$hostname:$defaultPort"
+                    }
+                  // try check yarn url
+                  HttpClientUtils.tryCheckUrl(url, rpcTimeoutForChecks)
               }
+              .getOrElse(throw new IOException(
+                "[StreamPark] YarnUtils.getRMWebAppURL: can not found yarn 
active node"))
+        }
+      }
 
-            val inetSocketAddress =
-              conf.getSocketAddr(name, s"0.0.0.0:$defaultPort", 
defaultPort.toInt)
+      def getAddressConfKey: String = {
+        val yarnConf = new YarnConfiguration(conf)
+        val activeRMId = findActiveRMId(yarnConf)
+        logInfo(s"current activeRMHAId: $activeRMId")
+        val activeRMKey = HAUtil.addSuffix(webConfKey, activeRMId)
+        val hostRMKey = HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, 
activeRMId)
+        (activeRMKey, hostRMKey) match {
+          case (key, host)
+              if HAUtil.getConfValueForRMInstance(key, yarnConf) == null &&
+                HAUtil.getConfValueForRMInstance(host, yarnConf) != null =>
+            logInfo(s"Find rm web address by : $host")
+            host
+          case (key, _) =>
+            logInfo(s"Find rm web address by : $key")
+            key
+        }
+      }
 
-            val address = NetUtils.getConnectAddress(inetSocketAddress)
+      rmHttpURL = protocol + Option(conf.get("yarn.web-proxy.address", 
null)).getOrElse {
+        val addrKey = if (!HAUtil.isHAEnabled(conf)) webConfKey else 
getAddressConfKey
+        val socketAddr = conf.getSocketAddr(addrKey, s"0.0.0.0:$defaultPort", 
defaultPort.toInt)
+        val address = NetUtils.getConnectAddress(socketAddr)
+        val resolved = address.getAddress
 
-            val buffer = new StringBuilder(protocol)
-            val resolved = address.getAddress
-            if (resolved != null && !resolved.isAnyLocalAddress && 
!resolved.isLoopbackAddress) {
-              buffer.append(address.getHostName)
-            } else {
-              Try(InetAddress.getLocalHost.getCanonicalHostName) match {
-                case Success(value) => buffer.append(value)
-                case _ => buffer.append(address.getHostName)
-              }
+        val hostName = {
+          if (resolved != null && !resolved.isAnyLocalAddress && 
!resolved.isLoopbackAddress) {
+            address.getHostName
+          } else {
+            Try(InetAddress.getLocalHost.getCanonicalHostName) match {
+              case Success(value) => value
+              case _ => address.getHostName
             }
-            buffer
-              .append(":")
-              .append(address.getPort)
-              .toString()
+          }
         }
-        logInfo(s"yarn resourceManager webapp url:$rmHttpURL")
+
+        s"$hostName:${address.getPort}"
       }
+      logInfo(s"yarn resourceManager webapp url:$rmHttpURL")
     }
     rmHttpURL
   }
 
-  private[this] def httpTestYarnRMUrl(url: String, timeout: Int): String = {
-    val httpClient = HttpClients.createDefault();
-    val context = HttpClientContext.create()
-    val httpGet = new HttpGet(url)
-    val requestConfig = RequestConfig
-      .custom()
-      .setSocketTimeout(timeout)
-      .setConnectTimeout(timeout)
-      .build()
-    httpGet.setConfig(requestConfig)
-    Try(httpClient.execute(httpGet, context)) match {
-      case Success(_) => context.getTargetHost.toString
-      case _ => null
-    }
-  }
-
   def getYarnAppTrackingUrl(applicationId: ApplicationId): String =
     HadoopUtils.yarnClient.getApplicationReport(applicationId).getTrackingUrl
 
@@ -299,10 +262,7 @@ object YarnUtils extends Logger {
         })
     } else {
       val url =
-        if (!hasYarnHttpSimpleAuth) reqUrl
-        else {
-          s"$reqUrl?user.name=${HadoopUtils.hadoopUserName}"
-        }
+        if (!hasYarnHttpSimpleAuth) reqUrl else 
s"$reqUrl?user.name=${HadoopUtils.hadoopUserName}"
       HttpClientUtils.httpGetRequest(url, config)
     }
   }
diff --git a/streampark-console/streampark-console-service/pom.xml 
b/streampark-console/streampark-console-service/pom.xml
index fa518b300..b53096058 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -54,7 +54,7 @@
         <lombok.version>1.18.24</lombok.version>
         <xml-apis.version>1.4.01</xml-apis.version>
         <ivy.version>2.5.0</ivy.version>
-        <eclipse.jgit.version>5.13.1.202206130422-r</eclipse.jgit.version>
+        <eclipse.jgit.version>5.13.3.202401111512-r</eclipse.jgit.version>
     </properties>
 
     <dependencyManagement>
@@ -236,6 +236,12 @@
             <scope>runtime</scope>
         </dependency>
 
+        <dependency>
+            <groupId>com.mysql</groupId>
+            <artifactId>mysql-connector-j</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- postgresql -->
         <dependency>
             <groupId>org.postgresql</groupId>

Reply via email to