This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 1b193aa19 [CELEBORN-1713] RpcTimeoutException should include RPC
address in message
1b193aa19 is described below
commit 1b193aa196415bcc92d46b63fac6742de7a6616f
Author: SteNicholas <[email protected]>
AuthorDate: Tue Nov 26 11:06:54 2024 +0800
[CELEBORN-1713] RpcTimeoutException should include RPC address in message
### What changes were proposed in this pull request?
`RpcTimeoutException` adds RPC address in message to help troubleshooting
of timeout.
### Why are the changes needed?
The message of `RpcTimeoutException` does not contain the RPC address in
the message at present, which causes that the timeout problem is hard to
troubleshooting for unknown rpc address.
```
24/11/12 03:00:51 [Executor task launch worker for task 53432.0 in stage
0.0 (TID 53487)] ERROR Executor: Exception in task 53432.0 in stage 0.0 (TID
53487)
org.apache.celeborn.common.rpc.RpcTimeoutException: Futures timed out after
[120000 milliseconds]. This timeout is controlled by celeborn.rpc.lookupTimeout
at
org.apache.celeborn.common.rpc.RpcTimeout.org$apache$celeborn$common$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:46)
at
org.apache.celeborn.common.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:61)
at
org.apache.celeborn.common.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:57)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
at
org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at
org.apache.celeborn.common.rpc.RpcEnv.setupEndpointRefByAddr(RpcEnv.scala:106)
at
org.apache.celeborn.common.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:114)
at
org.apache.celeborn.client.ShuffleClientImpl.setupLifecycleManagerRef(ShuffleClientImpl.java:1759)
at
org.apache.celeborn.client.ShuffleClient.get(ShuffleClient.java:89)
at
org.apache.spark.shuffle.celeborn.SparkShuffleManager.getWriter(SparkShuffleManager.java:239)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:144)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[120000 milliseconds]
at
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259)
at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
at
org.apache.celeborn.common.util.ThreadUtils$.awaitResult(ThreadUtils.scala:316)
at
org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:74)
... 15 more
```
Therefore, `RpcTimeoutException` should include RPC address in message to
help troubleshooting of timeout.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`NettyRpcEnvSuite#ask a message timeout on Future using RpcTimeout`
Closes #2907 from SteNicholas/CELEBORN-1713.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../org/apache/celeborn/common/client/MasterClient.java | 2 +-
.../org/apache/celeborn/common/rpc/RpcEndpointRef.scala | 2 +-
.../scala/org/apache/celeborn/common/rpc/RpcEnv.scala | 2 +-
.../org/apache/celeborn/common/rpc/RpcTimeout.scala | 16 ++++++++++------
.../apache/celeborn/common/rpc/netty/NettyRpcEnv.scala | 2 +-
.../org/apache/celeborn/common/rpc/RpcEnvSuite.scala | 13 ++++++++-----
6 files changed, 22 insertions(+), 15 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
index 8cb9bd02c..53c550797 100644
--- a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
+++ b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
@@ -151,7 +151,7 @@ public class MasterClient {
try {
endpointRef = getOrSetupRpcEndpointRef(currentMasterIdx);
Future<T> future = endpointRef.ask(message, rpcTimeout,
ClassTag$.MODULE$.apply(clz));
- return rpcTimeout.awaitResult(future);
+ return rpcTimeout.awaitResult(future, endpointRef.address());
} catch (Throwable e) {
throwable = e;
shouldRetry = shouldRetry(endpointRef, throwable);
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala
index 84f57549c..edd7005e2 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala
@@ -86,6 +86,6 @@ abstract class RpcEndpointRef(conf: CelebornConf)
*/
def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
val future = ask[T](message, timeout)
- timeout.awaitResult(future)
+ timeout.awaitResult(future, address)
}
}
diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
index 31f69a377..7a44d8b63 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
@@ -131,7 +131,7 @@ abstract class RpcEnv(config: RpcEnvConfig) {
* Retrieve the [[RpcEndpointRef]] represented by `addr`. This is a blocking
action.
*/
def setupEndpointRefByAddr(addr: RpcEndpointAddress): RpcEndpointRef = {
- defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByAddr(addr))
+ defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByAddr(addr),
addr.rpcAddress)
}
/**
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcTimeout.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcTimeout.scala
index 859c0e09a..ccca5614d 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcTimeout.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcTimeout.scala
@@ -42,8 +42,12 @@ private[celeborn] class RpcTimeout(val duration:
FiniteDuration, val timeoutProp
extends Serializable {
/** Amends the standard message of TimeoutException to include the
description */
- private def createRpcTimeoutException(te: TimeoutException):
RpcTimeoutException = {
- new RpcTimeoutException(te.getMessage + ". This timeout is controlled by "
+ timeoutProp, te)
+ private def createRpcTimeoutException(
+ te: TimeoutException,
+ rpcAddress: RpcAddress): RpcTimeoutException = {
+ new RpcTimeoutException(
+ s"${te.getMessage}. This timeout of rpc address $rpcAddress is
controlled by $timeoutProp",
+ te)
}
/**
@@ -54,11 +58,11 @@ private[celeborn] class RpcTimeout(val duration:
FiniteDuration, val timeoutProp
* val timeout = new RpcTimeout(5 millis, "short timeout")
* Future(throw new TimeoutException).recover(timeout.addMessageIfTimeout)
*/
- def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = {
+ def addMessageIfTimeout[T](rpcAddress: RpcAddress):
PartialFunction[Throwable, T] = {
// The exception has already been converted to a RpcTimeoutException so
just raise it
case rte: RpcTimeoutException => throw rte
// Any other TimeoutException get converted to a RpcTimeoutException with
modified message
- case te: TimeoutException => throw createRpcTimeoutException(te)
+ case te: TimeoutException => throw createRpcTimeoutException(te,
rpcAddress)
}
/**
@@ -69,10 +73,10 @@ private[celeborn] class RpcTimeout(val duration:
FiniteDuration, val timeoutProp
* @throws RpcTimeoutException if after waiting for the specified time
`future`
* is still not ready
*/
- def awaitResult[T](future: Future[T]): T = {
+ def awaitResult[T](future: Future[T], rpcAddress: RpcAddress): T = {
try {
ThreadUtils.awaitResult(future, duration)
- } catch addMessageIfTimeout
+ } catch addMessageIfTimeout(rpcAddress)
}
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
index 5d2d31f66..648393786 100644
---
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
@@ -293,7 +293,7 @@ class NettyRpcEnv(
case NonFatal(e) =>
onFailure(e)
}
-
promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
+
promise.future.mapTo[T].recover(timeout.addMessageIfTimeout(address))(ThreadUtils.sameThread)
}
private[celeborn] def serialize(content: Any): ByteBuffer = {
diff --git
a/common/src/test/scala/org/apache/celeborn/common/rpc/RpcEnvSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/rpc/RpcEnvSuite.scala
index 6011960f1..9e46c8b62 100644
--- a/common/src/test/scala/org/apache/celeborn/common/rpc/RpcEnvSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/rpc/RpcEnvSuite.scala
@@ -775,24 +775,26 @@ abstract class RpcEnvSuite extends CelebornFunSuite {
case _: NeverReply =>
}
})
+ val rpcAddress = rpcEndpointRef.address
val longTimeout = new RpcTimeout(1.second, "celeborn.rpc.long.timeout")
val shortTimeout = new RpcTimeout(10.milliseconds,
"celeborn.rpc.short.timeout")
// Ask with immediate response, should complete successfully
val fut1 = rpcEndpointRef.ask[String]("hello", longTimeout)
- val reply1 = longTimeout.awaitResult(fut1)
+ val reply1 = longTimeout.awaitResult(fut1, rpcAddress)
assert("hello" === reply1)
// Ask with a delayed response and wait for response immediately that
should timeout
val fut2 = rpcEndpointRef.ask[String](NeverReply("doh"), shortTimeout)
val reply2 =
intercept[RpcTimeoutException] {
- shortTimeout.awaitResult(fut2)
+ shortTimeout.awaitResult(fut2, rpcAddress)
}.getMessage
// RpcTimeout.awaitResult should have added the property to the
TimeoutException message
- assert(reply2.contains(shortTimeout.timeoutProp))
+ assert(reply2.contains(
+ s"This timeout of rpc address $rpcAddress is controlled by
${shortTimeout.timeoutProp}"))
// Ask with delayed response and allow the Future to timeout before
ThreadUtils.awaitResult
val fut3 = rpcEndpointRef.ask[String](NeverReply("goodbye"), shortTimeout)
@@ -808,13 +810,14 @@ abstract class RpcEnvSuite extends CelebornFunSuite {
// When the future timed out, the recover callback should have used
// RpcTimeout.addMessageIfTimeout to add the property to the
TimeoutException message
- assert(reply3.contains(shortTimeout.timeoutProp))
+ assert(reply3.contains(
+ s"This timeout of rpc address $rpcAddress is controlled by
${shortTimeout.timeoutProp}"))
// Use RpcTimeout.awaitResult to process Future, since it has already
failed with
// RpcTimeoutException, the same RpcTimeoutException should be thrown
val reply4 =
intercept[RpcTimeoutException] {
- shortTimeout.awaitResult(fut3)
+ shortTimeout.awaitResult(fut3, rpcAddress)
}.getMessage
// Ensure description is not in message twice after addMessageIfTimeout
and awaitResult