This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new fc102f072 [#1447] feat(client): Introduce configurations to control 
default behavior of RPC client (#1448)
fc102f072 is described below

commit fc102f072ac0b87cd65e032a7d77ed3290f4d4bc
Author: RickyMa <[email protected]>
AuthorDate: Tue Jan 16 17:03:00 2024 +0800

    [#1447] feat(client): Introduce configurations to control default behavior 
of RPC client (#1448)
    
    ### What changes were proposed in this pull request?
    
    Introduce two configurations to control the default behavior of the RPC 
client:
    |Property Name|Default| Description                                         
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
    
|---|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
    |<client_type>.rss.client.rpc.timeout.ms|60000| Timeout in milliseconds for 
RPC calls.|
    |<client_type>.rss.client.rpc.maxAttempts|3| When we fail to send RPC 
calls, we will retry for maxAttempts times.|
    
    ### Why are the changes needed?
    
    For [#1447](https://github.com/apache/incubator-uniffle/issues/1447)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manually.
---
 .../uniffle/common/config/RssClientConf.java       | 12 ++++++++
 docs/client_guide/client_guide.md                  |  2 ++
 .../client/factory/ShuffleServerClientFactory.java |  2 +-
 .../client/impl/grpc/ShuffleServerGrpcClient.java  | 31 ++++++++++++++++----
 .../impl/grpc/ShuffleServerGrpcNettyClient.java    | 33 +++++++++++++++-------
 5 files changed, 63 insertions(+), 17 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index b8adb591b..3e61c6103 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -65,6 +65,18 @@ public class RssClientConf {
               "The max concurrency for single partition to write, the value is 
the max file number "
                   + "for one partition, remote shuffle server should respect 
this.");
 
+  public static final ConfigOption<Long> RPC_TIMEOUT_MS =
+      ConfigOptions.key("rss.client.rpc.timeout.ms")
+          .longType()
+          .defaultValue(60 * 1000L)
+          .withDescription("Timeout in milliseconds for RPC calls.");
+
+  public static final ConfigOption<Integer> RPC_MAX_ATTEMPTS =
+      ConfigOptions.key("rss.client.rpc.maxAttempts")
+          .intType()
+          .defaultValue(3)
+          .withDescription("When we fail to send RPC calls, we will retry for 
maxAttempts times.");
+
   public static final ConfigOption<Integer> NETTY_IO_CONNECT_TIMEOUT_MS =
       ConfigOptions.key("rss.client.netty.io.connect.timeout.ms")
           .intType()
diff --git a/docs/client_guide/client_guide.md 
b/docs/client_guide/client_guide.md
index 375d8ceea..0918a992a 100644
--- a/docs/client_guide/client_guide.md
+++ b/docs/client_guide/client_guide.md
@@ -54,6 +54,8 @@ The important configuration of client is listed as following. 
These configuratio
 |<client_type>.rss.estimate.server.assignment.enabled|false| Support mr and 
spark, whether to enable estimation of the number of ShuffleServers that need 
to be allocated based on the number of concurrent tasks.                        
                                                                                
                                                                                
                                                                                
                   [...]
 |<client_type>.rss.estimate.task.concurrency.per.server|80| It takes effect 
when rss.estimate.server.assignment.enabled=true, how many tasks are 
concurrently assigned to a ShuffleServer.                                       
                                                                                
                                                                                
                                                                                
                            [...]
 |<client_type>.rss.client.max.concurrency.of.per-partition.write|-| The 
maximum number of files that can be written concurrently to a single partition 
is determined. This value will only be respected by the remote shuffle server 
if it is greater than 0.                                                        
                                                                                
                                                                                
                        [...]
+|<client_type>.rss.client.rpc.timeout.ms|60000| Timeout in milliseconds for 
RPC calls.                                                                      
                                                                                
                                                                                
                                                                                
                                                                                
                 [...]
+|<client_type>.rss.client.rpc.maxAttempts|3| When we fail to send RPC calls, 
we will retry for maxAttempts times.                                            
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
 Notice:
 
 1. `<client_type>` should be `mapreduce` `tez` or `spark`
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
index 25c26f9ad..8b4f5f91a 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
@@ -47,7 +47,7 @@ public class ShuffleServerClientFactory {
       String clientType, ShuffleServerInfo shuffleServerInfo, RssConf rssConf) 
{
     if (clientType.equalsIgnoreCase(ClientType.GRPC.name())) {
       return new ShuffleServerGrpcClient(
-          shuffleServerInfo.getHost(), shuffleServerInfo.getGrpcPort());
+          rssConf, shuffleServerInfo.getHost(), 
shuffleServerInfo.getGrpcPort());
     } else if (clientType.equalsIgnoreCase(ClientType.GRPC_NETTY.name())) {
       return new ShuffleServerGrpcNettyClient(
           rssConf,
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index cd6f43e41..18144378b 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -62,6 +62,8 @@ import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.NotRetryException;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.exception.RssFetchFailedException;
@@ -108,22 +110,39 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ShuffleServerGrpcClient.class);
   protected static final long FAILED_REQUIRE_ID = -1;
-  protected static final long RPC_TIMEOUT_DEFAULT_MS = 60000;
-  private long rpcTimeout = RPC_TIMEOUT_DEFAULT_MS;
+  protected long rpcTimeout;
   private ShuffleServerBlockingStub blockingStub;
 
+  @VisibleForTesting
   public ShuffleServerGrpcClient(String host, int port) {
-    this(host, port, 3);
+    this(
+        host,
+        port,
+        RssClientConf.RPC_MAX_ATTEMPTS.defaultValue(),
+        RssClientConf.RPC_TIMEOUT_MS.defaultValue());
+  }
+
+  public ShuffleServerGrpcClient(RssConf rssConf, String host, int port) {
+    this(
+        host,
+        port,
+        rssConf == null
+            ? RssClientConf.RPC_MAX_ATTEMPTS.defaultValue()
+            : rssConf.getInteger(RssClientConf.RPC_MAX_ATTEMPTS),
+        rssConf == null
+            ? RssClientConf.RPC_TIMEOUT_MS.defaultValue()
+            : rssConf.getLong(RssClientConf.RPC_TIMEOUT_MS));
   }
 
-  public ShuffleServerGrpcClient(String host, int port, int maxRetryAttempts) {
-    this(host, port, maxRetryAttempts, true);
+  public ShuffleServerGrpcClient(String host, int port, int maxRetryAttempts, 
long rpcTimeoutMs) {
+    this(host, port, maxRetryAttempts, rpcTimeoutMs, true);
   }
 
   public ShuffleServerGrpcClient(
-      String host, int port, int maxRetryAttempts, boolean usePlaintext) {
+      String host, int port, int maxRetryAttempts, long rpcTimeoutMs, boolean 
usePlaintext) {
     super(host, port, maxRetryAttempts, usePlaintext);
     blockingStub = ShuffleServerGrpc.newBlockingStub(channel);
+    rpcTimeout = rpcTimeoutMs;
   }
 
   public ShuffleServerBlockingStub getBlockingStub() {
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index 881934ac4..a8ee15490 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -33,6 +33,7 @@ import 
org.apache.uniffle.client.response.RssGetShuffleDataResponse;
 import org.apache.uniffle.client.response.RssGetShuffleIndexResponse;
 import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
 import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.NotRetryException;
 import org.apache.uniffle.common.exception.RssException;
@@ -58,12 +59,27 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
   private TransportClientFactory clientFactory;
 
   public ShuffleServerGrpcNettyClient(RssConf rssConf, String host, int 
grpcPort, int nettyPort) {
-    this(rssConf, host, grpcPort, nettyPort, 3);
+    this(
+        rssConf,
+        host,
+        grpcPort,
+        nettyPort,
+        rssConf == null
+            ? RssClientConf.RPC_MAX_ATTEMPTS.defaultValue()
+            : rssConf.getInteger(RssClientConf.RPC_MAX_ATTEMPTS),
+        rssConf == null
+            ? RssClientConf.RPC_TIMEOUT_MS.defaultValue()
+            : rssConf.getLong(RssClientConf.RPC_TIMEOUT_MS));
   }
 
   public ShuffleServerGrpcNettyClient(
-      RssConf rssConf, String host, int grpcPort, int nettyPort, int 
maxRetryAttempts) {
-    super(host, grpcPort, maxRetryAttempts);
+      RssConf rssConf,
+      String host,
+      int grpcPort,
+      int nettyPort,
+      int maxRetryAttempts,
+      long rpcTimeoutMs) {
+    super(host, grpcPort, maxRetryAttempts, rpcTimeoutMs);
     this.nettyPort = nettyPort;
     TransportContext transportContext = new TransportContext(new 
TransportConf(rssConf));
     this.clientFactory = new TransportClientFactory(transportContext);
@@ -116,7 +132,7 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
                       System.currentTimeMillis());
               long start = System.currentTimeMillis();
               RpcResponse rpcResponse =
-                  transportClient.sendRpcSync(sendShuffleDataRequest, 
RPC_TIMEOUT_DEFAULT_MS);
+                  transportClient.sendRpcSync(sendShuffleDataRequest, 
rpcTimeout);
               if (LOG.isDebugEnabled()) {
                 LOG.debug(
                     "Do sendShuffleData to {}:{} rpc cost:"
@@ -193,8 +209,7 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
             + "], lastBlockId["
             + request.getLastBlockId()
             + "]";
-    RpcResponse rpcResponse =
-        transportClient.sendRpcSync(getMemoryShuffleDataRequest, 
RPC_TIMEOUT_DEFAULT_MS);
+    RpcResponse rpcResponse = 
transportClient.sendRpcSync(getMemoryShuffleDataRequest, rpcTimeout);
     GetMemoryShuffleDataResponse getMemoryShuffleDataResponse =
         (GetMemoryShuffleDataResponse) rpcResponse;
     StatusCode statusCode = rpcResponse.getStatusCode();
@@ -231,8 +246,7 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
             request.getPartitionNumPerRange(),
             request.getPartitionNum());
     long start = System.currentTimeMillis();
-    RpcResponse rpcResponse =
-        transportClient.sendRpcSync(getLocalShuffleIndexRequest, 
RPC_TIMEOUT_DEFAULT_MS);
+    RpcResponse rpcResponse = 
transportClient.sendRpcSync(getLocalShuffleIndexRequest, rpcTimeout);
     String requestInfo =
         "appId["
             + request.getAppId()
@@ -285,8 +299,7 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
             request.getLength(),
             System.currentTimeMillis());
     long start = System.currentTimeMillis();
-    RpcResponse rpcResponse =
-        transportClient.sendRpcSync(getLocalShuffleIndexRequest, 
RPC_TIMEOUT_DEFAULT_MS);
+    RpcResponse rpcResponse = 
transportClient.sendRpcSync(getLocalShuffleIndexRequest, rpcTimeout);
     String requestInfo =
         "appId["
             + request.getAppId()

Reply via email to