This is an automated email from the ASF dual-hosted git repository.
gongzhongqiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new fe98be89f [Improve] Yarn request retry improvement (#3217)
fe98be89f is described below
commit fe98be89f4333d7bc7016ebd580dcf4b1c2363a2
Author: benjobs <[email protected]>
AuthorDate: Fri Oct 6 23:52:20 2023 -0500
[Improve] Yarn request retry improvement (#3217)
Co-authored-by: benjobs <[email protected]>
---
.../org/apache/streampark/common/util/Utils.scala | 27 ++++++--
.../apache/streampark/common/util/YarnUtils.scala | 71 +++++++++++++---------
2 files changed, 63 insertions(+), 35 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index 8e98a7b92..d91dc1d78 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -16,19 +16,20 @@
*/
package org.apache.streampark.common.util
-import org.apache.streampark.common.util.ImplicitsUtils._
-
import org.apache.commons.lang3.StringUtils
import java.io._
import java.net.URL
+import java.time.Duration
import java.util.{jar, Collection => JavaCollection, Map => JavaMap,
Properties, UUID}
+import java.util.concurrent.locks.LockSupport
import java.util.jar.{JarFile, JarInputStream}
+import scala.annotation.tailrec
import scala.collection.convert.ImplicitConversions._
import scala.util.{Failure, Success, Try}
-object Utils {
+object Utils extends Logger {
private[this] lazy val OS = System.getProperty("os.name").toLowerCase
@@ -128,8 +129,9 @@ object Utils {
c => {
try {
if (c != null) {
- if (c.isInstanceOf[Flushable]) {
- c.asInstanceOf[Flushable].flush()
+ c match {
+ case flushable: Flushable => flushable.flush()
+ case _ =>
}
c.close()
}
@@ -139,4 +141,19 @@ object Utils {
})
}
+ @tailrec
+ def retry[R](retryCount: Int, interval: Duration = Duration.ofSeconds(5))(f:
=> R): Try[R] = {
+ require(retryCount >= 0)
+ Try(f) match {
+ case Success(result) => Success(result)
+ case Failure(e) if retryCount > 0 =>
+ logWarn(s"retry failed, execution caused by: ", e)
+ logWarn(
+ s"$retryCount times retry remaining, the next attempt will be in
${interval.toMillis} ms")
+ LockSupport.parkNanos(interval.toNanos)
+ retry(retryCount - 1, interval)(f)
+ case Failure(e) => Failure(e)
+ }
+ }
+
}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index 6030cebb2..fd844eaee 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -243,46 +243,57 @@ object YarnUtils extends Logger {
* @return
*/
def restRequest(url: String): String = {
+ if (url == null) return null
- def request(reqUrl: String): String = {
- logDebug("request url is " + reqUrl)
- val config = RequestConfig.custom.setConnectTimeout(5000,
TimeUnit.MILLISECONDS).build
- if (hasYarnHttpKerberosAuth) {
- HadoopUtils
- .getUgi()
- .doAs(new PrivilegedExceptionAction[String] {
- override def run(): String = {
- Try(HttpClientUtils.httpAuthGetRequest(reqUrl, config)) match {
- case Success(v) => v
- case Failure(e) =>
- logError("yarnUtils authRestRequest error, detail: ", e)
- null
- }
- }
- })
- } else {
- val url = if (hasYarnHttpSampleAuth) {
- s"$reqUrl?user.name=${HadoopUtils.hadoopUserName}"
- } else reqUrl
- Try(HttpClientUtils.httpGetRequest(url, config)) match {
+ url match {
+ case u if u.matches("^http(|s)://.*") =>
+ Try(request(url)) match {
case Success(v) => v
case Failure(e) =>
- logError("yarnUtils restRequest error, detail: ", e)
+ if (hasYarnHttpKerberosAuth) {
+ logError(s"yarnUtils authRestRequest error, url: $u, detail: $e")
+ } else {
+ logError(s"yarnUtils restRequest error, url: $u, detail: $e")
+ }
null
}
- }
+ case _ =>
+ Try(request(s"${getRMWebAppURL()}/$url")) match {
+ case Success(v) => v
+ case Failure(_) =>
+ Utils.retry[String](5) {
+ request(s"${getRMWebAppURL(true)}/$url")
+ } match {
+ case Success(v) => v
+ case Failure(e) =>
+ logError(s"yarnUtils restRequest retry 5 times all failed.
detail: $e")
+ null
+ }
+ }
}
+ }
- url match {
- case u if u.matches("^http(|s)://.*") => request(url)
- case _ =>
- val resp = request(s"${getRMWebAppURL()}/$url")
- if (resp != null) resp;
+ private[this] def request(reqUrl: String): String = {
+ val config = RequestConfig
+ .custom()
+ .setConnectTimeout(5000, TimeUnit.MILLISECONDS)
+ .build()
+ if (hasYarnHttpKerberosAuth) {
+ HadoopUtils
+ .getUgi()
+ .doAs(new PrivilegedExceptionAction[String] {
+ override def run(): String = {
+ HttpClientUtils.httpAuthGetRequest(reqUrl, config)
+ }
+ })
+ } else {
+ val url =
+ if (!hasYarnHttpSampleAuth) reqUrl
else {
- request(s"${getRMWebAppURL(true)}/$url")
+ s"$reqUrl?user.name=${HadoopUtils.hadoopUserName}"
}
+ HttpClientUtils.httpGetRequest(url, config)
}
-
}
}