This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 4e5ff17eb [CELEBORN-1583] MasterClient#sendMessageInner should throw
Throwable for celeborn.masterClient.maxRetries is 0
4e5ff17eb is described below
commit 4e5ff17eb1be7cebdf32383e51141b2d7e862cf8
Author: SteNicholas <[email protected]>
AuthorDate: Thu Aug 29 11:58:26 2024 +0800
[CELEBORN-1583] MasterClient#sendMessageInner should throw Throwable for
celeborn.masterClient.maxRetries is 0
### What changes were proposed in this pull request?
`MasterClient#sendMessageInner` should throw `Throwable` for
`celeborn.masterClient.maxRetries` is 0.
### Why are the changes needed?
`MasterClient#sendMessageInner` causes `NullPointerException` with `Cannot
throw exception because "throwable" is null` for
`celeborn.masterClient.maxRetries` is 0.
```
2024-08-27T19:07:03.7681998Z 24/08/27 19:07:03,767 ERROR
[celeborn-dispatcher-2] MasterClient: Send rpc with failure, has tried 0, max
try 0!
2024-08-27T19:07:03.7693891Z 24/08/27 19:07:03,767 ERROR
[celeborn-dispatcher-2] LifecycleManager: AskSync RegisterShuffle for app-1-1
failed.
2024-08-27T19:07:03.7695444Z java.lang.NullPointerException: Cannot throw
exception because "throwable" is null
2024-08-27T19:07:03.7696857Z at
org.apache.celeborn.common.client.MasterClient.sendMessageInner(MasterClient.java:167)
2024-08-27T19:07:03.7698346Z at
org.apache.celeborn.common.client.MasterClient.askSync(MasterClient.java:121)
2024-08-27T19:07:03.7699927Z at
org.apache.celeborn.client.LifecycleManager.requestMasterRequestSlots(LifecycleManager.scala:1621)
2024-08-27T19:07:03.7701836Z at
org.apache.celeborn.client.LifecycleManager.requestMasterRequestSlotsWithRetry(LifecycleManager.scala:1610)
2024-08-27T19:07:03.7703976Z at
org.apache.celeborn.client.LifecycleManager.org$apache$celeborn$client$LifecycleManager$$offerAndReserveSlots(LifecycleManager.scala:642)
2024-08-27T19:07:03.7706423Z at
org.apache.celeborn.client.LifecycleManager$$anonfun$receiveAndReply$1.applyOrElse(LifecycleManager.scala:338)
2024-08-27T19:07:03.7708030Z at
org.apache.celeborn.common.rpc.netty.Inbox.processInternal(Inbox.scala:119)
2024-08-27T19:07:03.7709352Z at
org.apache.celeborn.common.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:218)
2024-08-27T19:07:03.7710619Z at
org.apache.celeborn.common.rpc.netty.Inbox.safelyCall(Inbox.scala:314)
2024-08-27T19:07:03.7711825Z at
org.apache.celeborn.common.rpc.netty.Inbox.process(Inbox.scala:218)
2024-08-27T19:07:03.7713139Z at
org.apache.celeborn.common.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:238)
2024-08-27T19:07:03.7714639Z at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
2024-08-27T19:07:03.7716148Z at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
2024-08-27T19:07:03.7717292Z at
java.base/java.lang.Thread.run(Thread.java:840)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`MasterClientSuiteJ#testSendMessageWithoutHAWithoutRetry`
Closes #2715 from SteNicholas/CELEBORN-1583.
Lead-authored-by: SteNicholas <[email protected]>
Co-authored-by: Nicholas Jiang <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit f801b7a32d92a58144d3195b45e9299faf87cba9)
Signed-off-by: Shuang <[email protected]>
---
.../celeborn/common/client/MasterClient.java | 11 +++++++++--
.../org/apache/celeborn/common/CelebornConf.scala | 2 +-
.../celeborn/common/client/MasterClientSuiteJ.java | 22 +++++++++++++++++++++-
3 files changed, 31 insertions(+), 4 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
index 7c39b2a5f..8cb9bd02c 100644
--- a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
+++ b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java
@@ -31,6 +31,7 @@ import scala.Tuple2;
import scala.concurrent.Future;
import scala.reflect.ClassTag$;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.GeneratedMessageV3;
import org.slf4j.Logger;
@@ -49,7 +50,7 @@ public class MasterClient {
private final RpcEnv rpcEnv;
private final MasterEndpointResolver masterEndpointResolver;
- private final int maxRetries;
+ private int maxRetries;
private final RpcTimeout rpcTimeout;
@@ -146,7 +147,7 @@ public class MasterClient {
AtomicInteger currentMasterIdx = new AtomicInteger(0);
long sleepLimitTime = 2000; // 2s
- while (numTries < maxRetries && shouldRetry) {
+ while (numTries <= maxRetries && shouldRetry) {
try {
endpointRef = getOrSetupRpcEndpointRef(currentMasterIdx);
Future<T> future = endpointRef.ask(message, rpcTimeout,
ClassTag$.MODULE$.apply(clz));
@@ -229,6 +230,7 @@ public class MasterClient {
RpcEndpointRef endpointRef = rpcEndpointRef.get();
List<String> activeMasterEndpoints =
masterEndpointResolver.getActiveMasterEndpoints();
+ maxRetries = Math.max(maxRetries, activeMasterEndpoints.size());
// If endpoints are updated by MasterEndpointResolver, we should reset the
currentIndex to 0.
// This also unset the value of updated, so we don't always reset
currentIndex to 0.
if (masterEndpointResolver.getUpdatedAndReset()) {
@@ -273,4 +275,9 @@ public class MasterClient {
}
return endpointRef;
}
+
+ @VisibleForTesting
+ public int getMaxRetries() {
+ return maxRetries;
+ }
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 2169e20ee..e006e765f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -2103,7 +2103,7 @@ object CelebornConf extends Logging {
.withAlternative("celeborn.client.maxRetries")
.internal
.categories("client", "worker")
- .doc("Max retry times for client to connect master endpoint")
+ .doc("Max retry times for client to connect master endpoint. The number
of retries will be at least equal to the number of master endpoints.")
.version("0.3.0")
.intConf
.createWithDefault(15)
diff --git
a/common/src/test/java/org/apache/celeborn/common/client/MasterClientSuiteJ.java
b/common/src/test/java/org/apache/celeborn/common/client/MasterClientSuiteJ.java
index 5d02fd62d..2e7866f66 100644
---
a/common/src/test/java/org/apache/celeborn/common/client/MasterClientSuiteJ.java
+++
b/common/src/test/java/org/apache/celeborn/common/client/MasterClientSuiteJ.java
@@ -183,7 +183,6 @@ public class MasterClientSuiteJ {
try {
response = client.askSync(message, HeartbeatFromWorkerResponse.class);
} catch (Throwable t) {
- t.printStackTrace();
LOG.error("It should be no exceptions when sending one-way message.", t);
fail("It should be no exceptions when sending one-way message.");
}
@@ -191,6 +190,27 @@ public class MasterClientSuiteJ {
assertEquals(mockResponse, response);
}
+ @Test
+ public void testSendMessageWithoutHAWithoutRetry() {
+ final AtomicInteger numTries = new AtomicInteger(0);
+ final CelebornConf conf =
+
prepareForCelebornConfWithoutHA().set(CelebornConf.MASTER_CLIENT_MAX_RETRIES(),
0);
+
+ prepareForEndpointRefWithRetry(numTries, () -> Future$.MODULE$.failed(new
IOException()));
+ prepareForRpcEnvWithoutHA();
+
+ MasterClient client = new MasterClient(rpcEnv, conf, false);
+ HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);
+
+ try {
+ client.askSync(message, HeartbeatFromWorkerResponse.class);
+ fail("It should be exceptions when sending one-way message.");
+ } catch (Throwable t) {
+ assertTrue(t.getCause() instanceof IOException);
+ assertEquals(1, client.getMaxRetries());
+ }
+ }
+
@Test
public void testSendMessageWithHA() {
final CelebornConf conf = prepareForCelebornConfWithHA();