This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch yarn-retry
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 1bdc62e133bfe971f05892d8d91b38720694b313
Author: benjobs <[email protected]>
AuthorDate: Sat Oct 7 12:22:54 2023 +0800

    [Improve] yarn request retry improvement
---
 .../org/apache/streampark/common/util/Utils.scala  | 27 ++++++--
 .../apache/streampark/common/util/YarnUtils.scala  | 71 +++++++++++++---------
 2 files changed, 63 insertions(+), 35 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index 8e98a7b92..d91dc1d78 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -16,19 +16,20 @@
  */
 package org.apache.streampark.common.util
 
-import org.apache.streampark.common.util.ImplicitsUtils._
-
 import org.apache.commons.lang3.StringUtils
 
 import java.io._
 import java.net.URL
+import java.time.Duration
 import java.util.{jar, Collection => JavaCollection, Map => JavaMap, 
Properties, UUID}
+import java.util.concurrent.locks.LockSupport
 import java.util.jar.{JarFile, JarInputStream}
 
+import scala.annotation.tailrec
 import scala.collection.convert.ImplicitConversions._
 import scala.util.{Failure, Success, Try}
 
-object Utils {
+object Utils extends Logger {
 
   private[this] lazy val OS = System.getProperty("os.name").toLowerCase
 
@@ -128,8 +129,9 @@ object Utils {
       c => {
         try {
           if (c != null) {
-            if (c.isInstanceOf[Flushable]) {
-              c.asInstanceOf[Flushable].flush()
+            c match {
+              case flushable: Flushable => flushable.flush()
+              case _ =>
             }
             c.close()
           }
@@ -139,4 +141,19 @@ object Utils {
       })
   }
 
+  @tailrec
+  def retry[R](retryCount: Int, interval: Duration = Duration.ofSeconds(5))(f: 
=> R): Try[R] = {
+    require(retryCount >= 0)
+    Try(f) match {
+      case Success(result) => Success(result)
+      case Failure(e) if retryCount > 0 =>
+        logWarn(s"retry failed, execution caused by: ", e)
+        logWarn(
+          s"$retryCount times retry remaining, the next attempt will be in 
${interval.toMillis} ms")
+        LockSupport.parkNanos(interval.toNanos)
+        retry(retryCount - 1, interval)(f)
+      case Failure(e) => Failure(e)
+    }
+  }
+
 }
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index 6030cebb2..fd844eaee 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -243,46 +243,57 @@ object YarnUtils extends Logger {
    * @return
    */
   def restRequest(url: String): String = {
+    if (url == null) return null
 
-    def request(reqUrl: String): String = {
-      logDebug("request url is " + reqUrl)
-      val config = RequestConfig.custom.setConnectTimeout(5000, 
TimeUnit.MILLISECONDS).build
-      if (hasYarnHttpKerberosAuth) {
-        HadoopUtils
-          .getUgi()
-          .doAs(new PrivilegedExceptionAction[String] {
-            override def run(): String = {
-              Try(HttpClientUtils.httpAuthGetRequest(reqUrl, config)) match {
-                case Success(v) => v
-                case Failure(e) =>
-                  logError("yarnUtils authRestRequest error, detail: ", e)
-                  null
-              }
-            }
-          })
-      } else {
-        val url = if (hasYarnHttpSampleAuth) {
-          s"$reqUrl?user.name=${HadoopUtils.hadoopUserName}"
-        } else reqUrl
-        Try(HttpClientUtils.httpGetRequest(url, config)) match {
+    url match {
+      case u if u.matches("^http(|s)://.*") =>
+        Try(request(url)) match {
           case Success(v) => v
           case Failure(e) =>
-            logError("yarnUtils restRequest error, detail: ", e)
+            if (hasYarnHttpKerberosAuth) {
+              logError(s"yarnUtils authRestRequest error, url: $u, detail: $e")
+            } else {
+              logError(s"yarnUtils restRequest error, url: $u, detail: $e")
+            }
             null
         }
-      }
+      case _ =>
+        Try(request(s"${getRMWebAppURL()}/$url")) match {
+          case Success(v) => v
+          case Failure(_) =>
+            Utils.retry[String](5) {
+              request(s"${getRMWebAppURL(true)}/$url")
+            } match {
+              case Success(v) => v
+              case Failure(e) =>
+                logError(s"yarnUtils restRequest retry 5 times all failed. 
detail: $e")
+                null
+            }
+        }
     }
+  }
 
-    url match {
-      case u if u.matches("^http(|s)://.*") => request(url)
-      case _ =>
-        val resp = request(s"${getRMWebAppURL()}/$url")
-        if (resp != null) resp;
+  private[this] def request(reqUrl: String): String = {
+    val config = RequestConfig
+      .custom()
+      .setConnectTimeout(5000, TimeUnit.MILLISECONDS)
+      .build()
+    if (hasYarnHttpKerberosAuth) {
+      HadoopUtils
+        .getUgi()
+        .doAs(new PrivilegedExceptionAction[String] {
+          override def run(): String = {
+            HttpClientUtils.httpAuthGetRequest(reqUrl, config)
+          }
+        })
+    } else {
+      val url =
+        if (!hasYarnHttpSampleAuth) reqUrl
         else {
-          request(s"${getRMWebAppURL(true)}/$url")
+          s"$reqUrl?user.name=${HadoopUtils.hadoopUserName}"
         }
+      HttpClientUtils.httpGetRequest(url, config)
     }
-
   }
 
 }

Reply via email to