This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
new ca1c1aecf [Improve] yarn request retry improvement
ca1c1aecf is described below
commit ca1c1aecf76be59fd8a2f661f00138c14a4bf0e0
Author: benjobs <[email protected]>
AuthorDate: Sat Oct 7 12:16:58 2023 +0800
[Improve] yarn request retry improvement
---
.../org/apache/streampark/common/util/Utils.scala | 20 ++++++-
.../apache/streampark/common/util/YarnUtils.scala | 66 ++++++++++++----------
2 files changed, 56 insertions(+), 30 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index 8b6ce53ab..d2047ced5 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -21,13 +21,16 @@ import org.apache.commons.lang3.StringUtils
import java.io.{BufferedInputStream, File, FileInputStream, IOException,
PrintWriter, StringWriter}
import java.lang.{Boolean => JavaBool, Byte => JavaByte, Double => JavaDouble,
Float => JavaFloat, Integer => JavaInt, Long => JavaLong, Short => JavaShort}
import java.net.URL
+import java.time.Duration
import java.util.{jar, Collection => JavaCollection, Map => JavaMap,
Properties, UUID}
+import java.util.concurrent.locks.LockSupport
import java.util.jar.{JarFile, JarInputStream}
+import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.util.{Failure, Success, Try}
-object Utils {
+object Utils extends Logger {
private[this] lazy val OS = System.getProperty("os.name").toLowerCase
@@ -136,6 +139,21 @@ object Utils {
})
}
+ @tailrec
+ def retry[R](retryCount: Int, interval: Duration = Duration.ofSeconds(5))(f:
=> R): Try[R] = {
+ require(retryCount >= 0)
+ Try(f) match {
+ case Success(result) => Success(result)
+ case Failure(e) if retryCount > 0 =>
+ logWarn(s"retry failed, execution caused by: ", e)
+ logWarn(
+ s"$retryCount times retry remaining, the next attempt will be in
${interval.toMillis} ms")
+ LockSupport.parkNanos(interval.toNanos)
+ retry(retryCount - 1, interval)(f)
+ case Failure(e) => Failure(e)
+ }
+ }
+
/**
* calculate the percentage of num1 / num2, the result range from 0 to 100,
with one small digit
* reserve.
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index bf49e4082..9eeac0643 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -260,43 +260,51 @@ object YarnUtils extends Logger {
def restRequest(url: String): String = {
if (url == null) return null
- def request(reqUrl: String): String = {
- logDebug("request url is " + reqUrl)
- val config = RequestConfig.custom.setConnectTimeout(5000).build
- if (hasYarnHttpKerberosAuth) {
- HadoopUtils
- .getUgi()
- .doAs(new PrivilegedExceptionAction[String] {
- override def run(): String = {
- Try(HttpClientUtils.httpAuthGetRequest(reqUrl, config)) match {
- case Success(v) => v
- case Failure(e) =>
- logError("yarnUtils authRestRequest error, detail: ", e)
- null
- }
- }
- })
- } else {
- val url = if (hasYarnHttpSampleAuth) {
- s"$reqUrl?user.name=${HadoopUtils.hadoopUserName}"
- } else reqUrl
- Try(HttpClientUtils.httpGetRequest(url, config)) match {
+ url match {
+ case u if u.matches("^http(|s)://.*") =>
+ Try(request(url)) match {
case Success(v) => v
case Failure(e) =>
- logError("yarnUtils restRequest error, detail: ", e)
+ if (hasYarnHttpKerberosAuth) {
+ logError(s"yarnUtils authRestRequest error, url: $u, detail: $e")
+ } else {
+ logError(s"yarnUtils restRequest error, url: $u, detail: $e")
+ }
null
}
- }
+ case _ =>
+ Try(request(s"${getRMWebAppURL()}/$url")) match {
+ case Success(v) => v
+ case Failure(_) =>
+ Utils.retry[String](5) {
+ request(s"${getRMWebAppURL(true)}/$url")
+ } match {
+ case Success(v) => v
+ case Failure(e) =>
+ logError(s"yarnUtils restRequest retry 5 times all failed.
detail: $e")
+ null
+ }
+ }
}
+ }
- url match {
- case u if u.matches("^http(|s)://.*") => request(url)
- case _ =>
- val resp = request(s"${getRMWebAppURL()}/$url")
- if (resp != null) resp;
+ private[this] def request(reqUrl: String): String = {
+ val config = RequestConfig.custom.setConnectTimeout(5000).build
+ if (hasYarnHttpKerberosAuth) {
+ HadoopUtils
+ .getUgi()
+ .doAs(new PrivilegedExceptionAction[String] {
+ override def run(): String = {
+ HttpClientUtils.httpAuthGetRequest(reqUrl, config)
+ }
+ })
+ } else {
+ val url =
+ if (!hasYarnHttpSampleAuth) reqUrl
else {
- request(s"${getRMWebAppURL(true)}/$url")
+ s"$reqUrl?user.name=${HadoopUtils.hadoopUserName}"
}
+ HttpClientUtils.httpGetRequest(url, config)
}
}