This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b193aa19 [CELEBORN-1713] RpcTimeoutException should include RPC 
address in message
1b193aa19 is described below

commit 1b193aa196415bcc92d46b63fac6742de7a6616f
Author: SteNicholas <[email protected]>
AuthorDate: Tue Nov 26 11:06:54 2024 +0800

    [CELEBORN-1713] RpcTimeoutException should include RPC address in message
    
    ### What changes were proposed in this pull request?
    
    `RpcTimeoutException` adds RPC address in message to help troubleshooting 
of timeout.
    
    ### Why are the changes needed?
    
    The message of `RpcTimeoutException` does not contain the RPC address in 
the message at present, which causes that the timeout problem is hard to 
troubleshooting for unknown rpc address.
    
    ```
    24/11/12 03:00:51 [Executor task launch worker for task 53432.0 in stage 
0.0 (TID 53487)] ERROR Executor: Exception in task 53432.0 in stage 0.0 (TID 
53487)
    org.apache.celeborn.common.rpc.RpcTimeoutException: Futures timed out after 
[120000 milliseconds]. This timeout is controlled by celeborn.rpc.lookupTimeout
            at 
org.apache.celeborn.common.rpc.RpcTimeout.org$apache$celeborn$common$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:46)
            at 
org.apache.celeborn.common.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:61)
            at 
org.apache.celeborn.common.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:57)
            at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
            at 
org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
            at 
org.apache.celeborn.common.rpc.RpcEnv.setupEndpointRefByAddr(RpcEnv.scala:106)
            at 
org.apache.celeborn.common.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:114)
            at 
org.apache.celeborn.client.ShuffleClientImpl.setupLifecycleManagerRef(ShuffleClientImpl.java:1759)
            at 
org.apache.celeborn.client.ShuffleClient.get(ShuffleClient.java:89)
            at 
org.apache.spark.shuffle.celeborn.SparkShuffleManager.getWriter(SparkShuffleManager.java:239)
            at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
            at org.apache.spark.scheduler.Task.run(Task.scala:144)
            at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
            at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
[120000 milliseconds]
            at 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259)
            at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
            at 
org.apache.celeborn.common.util.ThreadUtils$.awaitResult(ThreadUtils.scala:316)
            at 
org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:74)
            ... 15 more
    ```
    
    Therefore, `RpcTimeoutException` should include RPC address in message to 
help troubleshooting of timeout.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    `NettyRpcEnvSuite#ask a message timeout on Future using RpcTimeout`
    
    Closes #2907 from SteNicholas/CELEBORN-1713.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../org/apache/celeborn/common/client/MasterClient.java  |  2 +-
 .../org/apache/celeborn/common/rpc/RpcEndpointRef.scala  |  2 +-
 .../scala/org/apache/celeborn/common/rpc/RpcEnv.scala    |  2 +-
 .../org/apache/celeborn/common/rpc/RpcTimeout.scala      | 16 ++++++++++------
 .../apache/celeborn/common/rpc/netty/NettyRpcEnv.scala   |  2 +-
 .../org/apache/celeborn/common/rpc/RpcEnvSuite.scala     | 13 ++++++++-----
 6 files changed, 22 insertions(+), 15 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java 
b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
index 8cb9bd02c..53c550797 100644
--- a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
+++ b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
@@ -151,7 +151,7 @@ public class MasterClient {
       try {
         endpointRef = getOrSetupRpcEndpointRef(currentMasterIdx);
         Future<T> future = endpointRef.ask(message, rpcTimeout, 
ClassTag$.MODULE$.apply(clz));
-        return rpcTimeout.awaitResult(future);
+        return rpcTimeout.awaitResult(future, endpointRef.address());
       } catch (Throwable e) {
         throwable = e;
         shouldRetry = shouldRetry(endpointRef, throwable);
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala
index 84f57549c..edd7005e2 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala
@@ -86,6 +86,6 @@ abstract class RpcEndpointRef(conf: CelebornConf)
    */
   def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
     val future = ask[T](message, timeout)
-    timeout.awaitResult(future)
+    timeout.awaitResult(future, address)
   }
 }
diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
index 31f69a377..7a44d8b63 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
@@ -131,7 +131,7 @@ abstract class RpcEnv(config: RpcEnvConfig) {
    * Retrieve the [[RpcEndpointRef]] represented by `addr`. This is a blocking 
action.
    */
   def setupEndpointRefByAddr(addr: RpcEndpointAddress): RpcEndpointRef = {
-    defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByAddr(addr))
+    defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByAddr(addr), 
addr.rpcAddress)
   }
 
   /**
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcTimeout.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcTimeout.scala
index 859c0e09a..ccca5614d 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcTimeout.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcTimeout.scala
@@ -42,8 +42,12 @@ private[celeborn] class RpcTimeout(val duration: 
FiniteDuration, val timeoutProp
   extends Serializable {
 
   /** Amends the standard message of TimeoutException to include the 
description */
-  private def createRpcTimeoutException(te: TimeoutException): 
RpcTimeoutException = {
-    new RpcTimeoutException(te.getMessage + ". This timeout is controlled by " 
+ timeoutProp, te)
+  private def createRpcTimeoutException(
+      te: TimeoutException,
+      rpcAddress: RpcAddress): RpcTimeoutException = {
+    new RpcTimeoutException(
+      s"${te.getMessage}. This timeout of rpc address $rpcAddress is 
controlled by $timeoutProp",
+      te)
   }
 
   /**
@@ -54,11 +58,11 @@ private[celeborn] class RpcTimeout(val duration: 
FiniteDuration, val timeoutProp
    *    val timeout = new RpcTimeout(5 millis, "short timeout")
    *    Future(throw new TimeoutException).recover(timeout.addMessageIfTimeout)
    */
-  def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = {
+  def addMessageIfTimeout[T](rpcAddress: RpcAddress): 
PartialFunction[Throwable, T] = {
     // The exception has already been converted to a RpcTimeoutException so 
just raise it
     case rte: RpcTimeoutException => throw rte
     // Any other TimeoutException get converted to a RpcTimeoutException with 
modified message
-    case te: TimeoutException => throw createRpcTimeoutException(te)
+    case te: TimeoutException => throw createRpcTimeoutException(te, 
rpcAddress)
   }
 
   /**
@@ -69,10 +73,10 @@ private[celeborn] class RpcTimeout(val duration: 
FiniteDuration, val timeoutProp
    * @throws RpcTimeoutException if after waiting for the specified time 
`future`
    *         is still not ready
    */
-  def awaitResult[T](future: Future[T]): T = {
+  def awaitResult[T](future: Future[T], rpcAddress: RpcAddress): T = {
     try {
       ThreadUtils.awaitResult(future, duration)
-    } catch addMessageIfTimeout
+    } catch addMessageIfTimeout(rpcAddress)
   }
 }
 
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
index 5d2d31f66..648393786 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
@@ -293,7 +293,7 @@ class NettyRpcEnv(
       case NonFatal(e) =>
         onFailure(e)
     }
-    
promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
+    
promise.future.mapTo[T].recover(timeout.addMessageIfTimeout(address))(ThreadUtils.sameThread)
   }
 
   private[celeborn] def serialize(content: Any): ByteBuffer = {
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/rpc/RpcEnvSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/rpc/RpcEnvSuite.scala
index 6011960f1..9e46c8b62 100644
--- a/common/src/test/scala/org/apache/celeborn/common/rpc/RpcEnvSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/rpc/RpcEnvSuite.scala
@@ -775,24 +775,26 @@ abstract class RpcEnvSuite extends CelebornFunSuite {
           case _: NeverReply =>
         }
       })
+    val rpcAddress = rpcEndpointRef.address
 
     val longTimeout = new RpcTimeout(1.second, "celeborn.rpc.long.timeout")
     val shortTimeout = new RpcTimeout(10.milliseconds, 
"celeborn.rpc.short.timeout")
 
     // Ask with immediate response, should complete successfully
     val fut1 = rpcEndpointRef.ask[String]("hello", longTimeout)
-    val reply1 = longTimeout.awaitResult(fut1)
+    val reply1 = longTimeout.awaitResult(fut1, rpcAddress)
     assert("hello" === reply1)
 
     // Ask with a delayed response and wait for response immediately that 
should timeout
     val fut2 = rpcEndpointRef.ask[String](NeverReply("doh"), shortTimeout)
     val reply2 =
       intercept[RpcTimeoutException] {
-        shortTimeout.awaitResult(fut2)
+        shortTimeout.awaitResult(fut2, rpcAddress)
       }.getMessage
 
     // RpcTimeout.awaitResult should have added the property to the 
TimeoutException message
-    assert(reply2.contains(shortTimeout.timeoutProp))
+    assert(reply2.contains(
+      s"This timeout of rpc address $rpcAddress is controlled by 
${shortTimeout.timeoutProp}"))
 
     // Ask with delayed response and allow the Future to timeout before 
ThreadUtils.awaitResult
     val fut3 = rpcEndpointRef.ask[String](NeverReply("goodbye"), shortTimeout)
@@ -808,13 +810,14 @@ abstract class RpcEnvSuite extends CelebornFunSuite {
 
     // When the future timed out, the recover callback should have used
     // RpcTimeout.addMessageIfTimeout to add the property to the 
TimeoutException message
-    assert(reply3.contains(shortTimeout.timeoutProp))
+    assert(reply3.contains(
+      s"This timeout of rpc address $rpcAddress is controlled by 
${shortTimeout.timeoutProp}"))
 
     // Use RpcTimeout.awaitResult to process Future, since it has already 
failed with
     // RpcTimeoutException, the same RpcTimeoutException should be thrown
     val reply4 =
       intercept[RpcTimeoutException] {
-        shortTimeout.awaitResult(fut3)
+        shortTimeout.awaitResult(fut3, rpcAddress)
       }.getMessage
 
     // Ensure description is not in message twice after addMessageIfTimeout 
and awaitResult

Reply via email to