This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
     new ca1c1aecf [Improve] yarn request retry improvement
ca1c1aecf is described below

commit ca1c1aecf76be59fd8a2f661f00138c14a4bf0e0
Author: benjobs <[email protected]>
AuthorDate: Sat Oct 7 12:16:58 2023 +0800

    [Improve] yarn request retry improvement
---
 .../org/apache/streampark/common/util/Utils.scala  | 20 ++++++-
 .../apache/streampark/common/util/YarnUtils.scala  | 66 ++++++++++++----------
 2 files changed, 56 insertions(+), 30 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index 8b6ce53ab..d2047ced5 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -21,13 +21,16 @@ import org.apache.commons.lang3.StringUtils
 import java.io.{BufferedInputStream, File, FileInputStream, IOException, 
PrintWriter, StringWriter}
 import java.lang.{Boolean => JavaBool, Byte => JavaByte, Double => JavaDouble, 
Float => JavaFloat, Integer => JavaInt, Long => JavaLong, Short => JavaShort}
 import java.net.URL
+import java.time.Duration
 import java.util.{jar, Collection => JavaCollection, Map => JavaMap, 
Properties, UUID}
+import java.util.concurrent.locks.LockSupport
 import java.util.jar.{JarFile, JarInputStream}
 
+import scala.annotation.tailrec
 import scala.collection.JavaConversions._
 import scala.util.{Failure, Success, Try}
 
-object Utils {
+object Utils extends Logger {
 
   private[this] lazy val OS = System.getProperty("os.name").toLowerCase
 
@@ -136,6 +139,21 @@ object Utils {
       })
   }
 
+  @tailrec
+  def retry[R](retryCount: Int, interval: Duration = Duration.ofSeconds(5))(f: 
=> R): Try[R] = {
+    require(retryCount >= 0)
+    Try(f) match {
+      case Success(result) => Success(result)
+      case Failure(e) if retryCount > 0 =>
+        logWarn(s"retry failed, execution caused by: ", e)
+        logWarn(
+          s"$retryCount times retry remaining, the next attempt will be in 
${interval.toMillis} ms")
+        LockSupport.parkNanos(interval.toNanos)
+        retry(retryCount - 1, interval)(f)
+      case Failure(e) => Failure(e)
+    }
+  }
+
   /**
    * calculate the percentage of num1 / num2, the result range from 0 to 100, 
with one small digit
    * reserve.
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index bf49e4082..9eeac0643 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -260,43 +260,51 @@ object YarnUtils extends Logger {
   def restRequest(url: String): String = {
     if (url == null) return null
 
-    def request(reqUrl: String): String = {
-      logDebug("request url is " + reqUrl)
-      val config = RequestConfig.custom.setConnectTimeout(5000).build
-      if (hasYarnHttpKerberosAuth) {
-        HadoopUtils
-          .getUgi()
-          .doAs(new PrivilegedExceptionAction[String] {
-            override def run(): String = {
-              Try(HttpClientUtils.httpAuthGetRequest(reqUrl, config)) match {
-                case Success(v) => v
-                case Failure(e) =>
-                  logError("yarnUtils authRestRequest error, detail: ", e)
-                  null
-              }
-            }
-          })
-      } else {
-        val url = if (hasYarnHttpSampleAuth) {
-          s"$reqUrl?user.name=${HadoopUtils.hadoopUserName}"
-        } else reqUrl
-        Try(HttpClientUtils.httpGetRequest(url, config)) match {
+    url match {
+      case u if u.matches("^http(|s)://.*") =>
+        Try(request(url)) match {
           case Success(v) => v
           case Failure(e) =>
-            logError("yarnUtils restRequest error, detail: ", e)
+            if (hasYarnHttpKerberosAuth) {
+              logError(s"yarnUtils authRestRequest error, url: $u, detail: $e")
+            } else {
+              logError(s"yarnUtils restRequest error, url: $u, detail: $e")
+            }
             null
         }
-      }
+      case _ =>
+        Try(request(s"${getRMWebAppURL()}/$url")) match {
+          case Success(v) => v
+          case Failure(_) =>
+            Utils.retry[String](5) {
+              request(s"${getRMWebAppURL(true)}/$url")
+            } match {
+              case Success(v) => v
+              case Failure(e) =>
+                logError(s"yarnUtils restRequest retry 5 times all failed. 
detail: $e")
+                null
+            }
+        }
     }
+  }
 
-    url match {
-      case u if u.matches("^http(|s)://.*") => request(url)
-      case _ =>
-        val resp = request(s"${getRMWebAppURL()}/$url")
-        if (resp != null) resp;
+  private[this] def request(reqUrl: String): String = {
+    val config = RequestConfig.custom.setConnectTimeout(5000).build
+    if (hasYarnHttpKerberosAuth) {
+      HadoopUtils
+        .getUgi()
+        .doAs(new PrivilegedExceptionAction[String] {
+          override def run(): String = {
+            HttpClientUtils.httpAuthGetRequest(reqUrl, config)
+          }
+        })
+    } else {
+      val url =
+        if (!hasYarnHttpSampleAuth) reqUrl
         else {
-          request(s"${getRMWebAppURL(true)}/$url")
+          s"$reqUrl?user.name=${HadoopUtils.hadoopUserName}"
         }
+      HttpClientUtils.httpGetRequest(url, config)
     }
   }
 

Reply via email to