This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new f801b7a32 [CELEBORN-1583] MasterClient#sendMessageInner should throw 
Throwable for celeborn.masterClient.maxRetries is 0
f801b7a32 is described below

commit f801b7a32d92a58144d3195b45e9299faf87cba9
Author: SteNicholas <[email protected]>
AuthorDate: Thu Aug 29 11:58:26 2024 +0800

    [CELEBORN-1583] MasterClient#sendMessageInner should throw Throwable for 
celeborn.masterClient.maxRetries is 0
    
    ### What changes were proposed in this pull request?
    
    `MasterClient#sendMessageInner` should throw `Throwable` for 
`celeborn.masterClient.maxRetries` is 0.
    
    ### Why are the changes needed?
    
    `MasterClient#sendMessageInner` causes `NullPointerException` with `Cannot 
throw exception because "throwable" is null` for 
`celeborn.masterClient.maxRetries` is 0.
    
    ```
    2024-08-27T19:07:03.7681998Z 24/08/27 19:07:03,767 ERROR 
[celeborn-dispatcher-2] MasterClient: Send rpc with failure, has tried 0, max 
try 0!
    2024-08-27T19:07:03.7693891Z 24/08/27 19:07:03,767 ERROR 
[celeborn-dispatcher-2] LifecycleManager: AskSync RegisterShuffle for app-1-1 
failed.
    2024-08-27T19:07:03.7695444Z java.lang.NullPointerException: Cannot throw 
exception because "throwable" is null
    2024-08-27T19:07:03.7696857Z    at 
org.apache.celeborn.common.client.MasterClient.sendMessageInner(MasterClient.java:167)
    2024-08-27T19:07:03.7698346Z    at 
org.apache.celeborn.common.client.MasterClient.askSync(MasterClient.java:121)
    2024-08-27T19:07:03.7699927Z    at 
org.apache.celeborn.client.LifecycleManager.requestMasterRequestSlots(LifecycleManager.scala:1621)
    2024-08-27T19:07:03.7701836Z    at 
org.apache.celeborn.client.LifecycleManager.requestMasterRequestSlotsWithRetry(LifecycleManager.scala:1610)
    2024-08-27T19:07:03.7703976Z    at 
org.apache.celeborn.client.LifecycleManager.org$apache$celeborn$client$LifecycleManager$$offerAndReserveSlots(LifecycleManager.scala:642)
    2024-08-27T19:07:03.7706423Z    at 
org.apache.celeborn.client.LifecycleManager$$anonfun$receiveAndReply$1.applyOrElse(LifecycleManager.scala:338)
    2024-08-27T19:07:03.7708030Z    at 
org.apache.celeborn.common.rpc.netty.Inbox.processInternal(Inbox.scala:119)
    2024-08-27T19:07:03.7709352Z    at 
org.apache.celeborn.common.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:218)
    2024-08-27T19:07:03.7710619Z    at 
org.apache.celeborn.common.rpc.netty.Inbox.safelyCall(Inbox.scala:314)
    2024-08-27T19:07:03.7711825Z    at 
org.apache.celeborn.common.rpc.netty.Inbox.process(Inbox.scala:218)
    2024-08-27T19:07:03.7713139Z    at 
org.apache.celeborn.common.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:238)
    2024-08-27T19:07:03.7714639Z    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    2024-08-27T19:07:03.7716148Z    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    2024-08-27T19:07:03.7717292Z    at 
java.base/java.lang.Thread.run(Thread.java:840)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    `MasterClientSuiteJ#testSendMessageWithoutHAWithoutRetry`
    
    Closes #2715 from SteNicholas/CELEBORN-1583.
    
    Lead-authored-by: SteNicholas <[email protected]>
    Co-authored-by: Nicholas Jiang <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../celeborn/common/client/MasterClient.java       | 11 +++++++++--
 .../org/apache/celeborn/common/CelebornConf.scala  |  2 +-
 .../celeborn/common/client/MasterClientSuiteJ.java | 22 +++++++++++++++++++++-
 3 files changed, 31 insertions(+), 4 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java 
b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
index 7c39b2a5f..8cb9bd02c 100644
--- a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
+++ b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
@@ -31,6 +31,7 @@ import scala.Tuple2;
 import scala.concurrent.Future;
 import scala.reflect.ClassTag$;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.GeneratedMessageV3;
 import org.slf4j.Logger;
@@ -49,7 +50,7 @@ public class MasterClient {
 
   private final RpcEnv rpcEnv;
   private final MasterEndpointResolver masterEndpointResolver;
-  private final int maxRetries;
+  private int maxRetries;
 
   private final RpcTimeout rpcTimeout;
 
@@ -146,7 +147,7 @@ public class MasterClient {
     AtomicInteger currentMasterIdx = new AtomicInteger(0);
 
     long sleepLimitTime = 2000; // 2s
-    while (numTries < maxRetries && shouldRetry) {
+    while (numTries <= maxRetries && shouldRetry) {
       try {
         endpointRef = getOrSetupRpcEndpointRef(currentMasterIdx);
         Future<T> future = endpointRef.ask(message, rpcTimeout, 
ClassTag$.MODULE$.apply(clz));
@@ -229,6 +230,7 @@ public class MasterClient {
     RpcEndpointRef endpointRef = rpcEndpointRef.get();
 
     List<String> activeMasterEndpoints = 
masterEndpointResolver.getActiveMasterEndpoints();
+    maxRetries = Math.max(maxRetries, activeMasterEndpoints.size());
     // If endpoints are updated by MasterEndpointResolver, we should reset the 
currentIndex to 0.
     // This also unset the value of updated, so we don't always reset 
currentIndex to 0.
     if (masterEndpointResolver.getUpdatedAndReset()) {
@@ -273,4 +275,9 @@ public class MasterClient {
     }
     return endpointRef;
   }
+
+  @VisibleForTesting
+  public int getMaxRetries() {
+    return maxRetries;
+  }
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index b3cc756bb..1730e411a 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -2145,7 +2145,7 @@ object CelebornConf extends Logging {
       .withAlternative("celeborn.client.maxRetries")
       .internal
       .categories("client", "worker")
-      .doc("Max retry times for client to connect master endpoint")
+      .doc("Max retry times for client to connect master endpoint. The number 
of retries will be at least equal to the number of master endpoints.")
       .version("0.3.0")
       .intConf
       .createWithDefault(15)
diff --git 
a/common/src/test/java/org/apache/celeborn/common/client/MasterClientSuiteJ.java
 
b/common/src/test/java/org/apache/celeborn/common/client/MasterClientSuiteJ.java
index 5d02fd62d..2e7866f66 100644
--- 
a/common/src/test/java/org/apache/celeborn/common/client/MasterClientSuiteJ.java
+++ 
b/common/src/test/java/org/apache/celeborn/common/client/MasterClientSuiteJ.java
@@ -183,7 +183,6 @@ public class MasterClientSuiteJ {
     try {
       response = client.askSync(message, HeartbeatFromWorkerResponse.class);
     } catch (Throwable t) {
-      t.printStackTrace();
       LOG.error("It should be no exceptions when sending one-way message.", t);
       fail("It should be no exceptions when sending one-way message.");
     }
@@ -191,6 +190,27 @@ public class MasterClientSuiteJ {
     assertEquals(mockResponse, response);
   }
 
+  @Test
+  public void testSendMessageWithoutHAWithoutRetry() {
+    final AtomicInteger numTries = new AtomicInteger(0);
+    final CelebornConf conf =
+        
prepareForCelebornConfWithoutHA().set(CelebornConf.MASTER_CLIENT_MAX_RETRIES(), 
0);
+
+    prepareForEndpointRefWithRetry(numTries, () -> Future$.MODULE$.failed(new 
IOException()));
+    prepareForRpcEnvWithoutHA();
+
+    MasterClient client = new MasterClient(rpcEnv, conf, false);
+    HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);
+
+    try {
+      client.askSync(message, HeartbeatFromWorkerResponse.class);
+      fail("It should be exceptions when sending one-way message.");
+    } catch (Throwable t) {
+      assertTrue(t.getCause() instanceof IOException);
+      assertEquals(1, client.getMaxRetries());
+    }
+  }
+
   @Test
   public void testSendMessageWithHA() {
     final CelebornConf conf = prepareForCelebornConfWithHA();

Reply via email to