This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new fc102f072 [#1447] feat(client): Introduce configurations to control
default behavior of RPC client (#1448)
fc102f072 is described below
commit fc102f072ac0b87cd65e032a7d77ed3290f4d4bc
Author: RickyMa <[email protected]>
AuthorDate: Tue Jan 16 17:03:00 2024 +0800
[#1447] feat(client): Introduce configurations to control default behavior
of RPC client (#1448)
### What changes were proposed in this pull request?
Introduce two configurations to control the default behavior of the RPC
client:
|Property Name|Default| Description
[...]
|---|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|<client_type>.rss.client.rpc.timeout.ms|60000| Timeout in milliseconds for
RPC calls.|
|<client_type>.rss.client.rpc.maxAttempts|3| When we fail to send RPC
calls, we will retry for maxAttempts times.|
### Why are the changes needed?
For [#1447](https://github.com/apache/incubator-uniffle/issues/1447)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manually.
---
.../uniffle/common/config/RssClientConf.java | 12 ++++++++
docs/client_guide/client_guide.md | 2 ++
.../client/factory/ShuffleServerClientFactory.java | 2 +-
.../client/impl/grpc/ShuffleServerGrpcClient.java | 31 ++++++++++++++++----
.../impl/grpc/ShuffleServerGrpcNettyClient.java | 33 +++++++++++++++-------
5 files changed, 63 insertions(+), 17 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index b8adb591b..3e61c6103 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -65,6 +65,18 @@ public class RssClientConf {
"The max concurrency for single partition to write, the value is
the max file number "
+ "for one partition, remote shuffle server should respect
this.");
+ public static final ConfigOption<Long> RPC_TIMEOUT_MS =
+ ConfigOptions.key("rss.client.rpc.timeout.ms")
+ .longType()
+ .defaultValue(60 * 1000L)
+ .withDescription("Timeout in milliseconds for RPC calls.");
+
+ public static final ConfigOption<Integer> RPC_MAX_ATTEMPTS =
+ ConfigOptions.key("rss.client.rpc.maxAttempts")
+ .intType()
+ .defaultValue(3)
+ .withDescription("When we fail to send RPC calls, we will retry for
maxAttempts times.");
+
public static final ConfigOption<Integer> NETTY_IO_CONNECT_TIMEOUT_MS =
ConfigOptions.key("rss.client.netty.io.connect.timeout.ms")
.intType()
diff --git a/docs/client_guide/client_guide.md
b/docs/client_guide/client_guide.md
index 375d8ceea..0918a992a 100644
--- a/docs/client_guide/client_guide.md
+++ b/docs/client_guide/client_guide.md
@@ -54,6 +54,8 @@ The important configuration of client is listed as following.
These configuratio
|<client_type>.rss.estimate.server.assignment.enabled|false| Support mr and
spark, whether to enable estimation of the number of ShuffleServers that need
to be allocated based on the number of concurrent tasks.
[...]
|<client_type>.rss.estimate.task.concurrency.per.server|80| It takes effect
when rss.estimate.server.assignment.enabled=true, how many tasks are
concurrently assigned to a ShuffleServer.
[...]
|<client_type>.rss.client.max.concurrency.of.per-partition.write|-| The
maximum number of files that can be written concurrently to a single partition
is determined. This value will only be respected by the remote shuffle server
if it is greater than 0.
[...]
+|<client_type>.rss.client.rpc.timeout.ms|60000| Timeout in milliseconds for
RPC calls.
[...]
+|<client_type>.rss.client.rpc.maxAttempts|3| When we fail to send RPC calls,
we will retry for maxAttempts times.
[...]
Notice:
1. `<client_type>` should be `mapreduce` `tez` or `spark`
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
b/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
index 25c26f9ad..8b4f5f91a 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
@@ -47,7 +47,7 @@ public class ShuffleServerClientFactory {
String clientType, ShuffleServerInfo shuffleServerInfo, RssConf rssConf)
{
if (clientType.equalsIgnoreCase(ClientType.GRPC.name())) {
return new ShuffleServerGrpcClient(
- shuffleServerInfo.getHost(), shuffleServerInfo.getGrpcPort());
+ rssConf, shuffleServerInfo.getHost(),
shuffleServerInfo.getGrpcPort());
} else if (clientType.equalsIgnoreCase(ClientType.GRPC_NETTY.name())) {
return new ShuffleServerGrpcNettyClient(
rssConf,
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index cd6f43e41..18144378b 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -62,6 +62,8 @@ import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.config.RssClientConf;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.NotRetryException;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.exception.RssFetchFailedException;
@@ -108,22 +110,39 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
private static final Logger LOG =
LoggerFactory.getLogger(ShuffleServerGrpcClient.class);
protected static final long FAILED_REQUIRE_ID = -1;
- protected static final long RPC_TIMEOUT_DEFAULT_MS = 60000;
- private long rpcTimeout = RPC_TIMEOUT_DEFAULT_MS;
+ protected long rpcTimeout;
private ShuffleServerBlockingStub blockingStub;
+ @VisibleForTesting
public ShuffleServerGrpcClient(String host, int port) {
- this(host, port, 3);
+ this(
+ host,
+ port,
+ RssClientConf.RPC_MAX_ATTEMPTS.defaultValue(),
+ RssClientConf.RPC_TIMEOUT_MS.defaultValue());
+ }
+
+ public ShuffleServerGrpcClient(RssConf rssConf, String host, int port) {
+ this(
+ host,
+ port,
+ rssConf == null
+ ? RssClientConf.RPC_MAX_ATTEMPTS.defaultValue()
+ : rssConf.getInteger(RssClientConf.RPC_MAX_ATTEMPTS),
+ rssConf == null
+ ? RssClientConf.RPC_TIMEOUT_MS.defaultValue()
+ : rssConf.getLong(RssClientConf.RPC_TIMEOUT_MS));
}
- public ShuffleServerGrpcClient(String host, int port, int maxRetryAttempts) {
- this(host, port, maxRetryAttempts, true);
+ public ShuffleServerGrpcClient(String host, int port, int maxRetryAttempts,
long rpcTimeoutMs) {
+ this(host, port, maxRetryAttempts, rpcTimeoutMs, true);
}
public ShuffleServerGrpcClient(
- String host, int port, int maxRetryAttempts, boolean usePlaintext) {
+ String host, int port, int maxRetryAttempts, long rpcTimeoutMs, boolean
usePlaintext) {
super(host, port, maxRetryAttempts, usePlaintext);
blockingStub = ShuffleServerGrpc.newBlockingStub(channel);
+ rpcTimeout = rpcTimeoutMs;
}
public ShuffleServerBlockingStub getBlockingStub() {
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index 881934ac4..a8ee15490 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -33,6 +33,7 @@ import
org.apache.uniffle.client.response.RssGetShuffleDataResponse;
import org.apache.uniffle.client.response.RssGetShuffleIndexResponse;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.NotRetryException;
import org.apache.uniffle.common.exception.RssException;
@@ -58,12 +59,27 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
private TransportClientFactory clientFactory;
public ShuffleServerGrpcNettyClient(RssConf rssConf, String host, int
grpcPort, int nettyPort) {
- this(rssConf, host, grpcPort, nettyPort, 3);
+ this(
+ rssConf,
+ host,
+ grpcPort,
+ nettyPort,
+ rssConf == null
+ ? RssClientConf.RPC_MAX_ATTEMPTS.defaultValue()
+ : rssConf.getInteger(RssClientConf.RPC_MAX_ATTEMPTS),
+ rssConf == null
+ ? RssClientConf.RPC_TIMEOUT_MS.defaultValue()
+ : rssConf.getLong(RssClientConf.RPC_TIMEOUT_MS));
}
public ShuffleServerGrpcNettyClient(
- RssConf rssConf, String host, int grpcPort, int nettyPort, int
maxRetryAttempts) {
- super(host, grpcPort, maxRetryAttempts);
+ RssConf rssConf,
+ String host,
+ int grpcPort,
+ int nettyPort,
+ int maxRetryAttempts,
+ long rpcTimeoutMs) {
+ super(host, grpcPort, maxRetryAttempts, rpcTimeoutMs);
this.nettyPort = nettyPort;
TransportContext transportContext = new TransportContext(new
TransportConf(rssConf));
this.clientFactory = new TransportClientFactory(transportContext);
@@ -116,7 +132,7 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
System.currentTimeMillis());
long start = System.currentTimeMillis();
RpcResponse rpcResponse =
- transportClient.sendRpcSync(sendShuffleDataRequest,
RPC_TIMEOUT_DEFAULT_MS);
+ transportClient.sendRpcSync(sendShuffleDataRequest,
rpcTimeout);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Do sendShuffleData to {}:{} rpc cost:"
@@ -193,8 +209,7 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
+ "], lastBlockId["
+ request.getLastBlockId()
+ "]";
- RpcResponse rpcResponse =
- transportClient.sendRpcSync(getMemoryShuffleDataRequest,
RPC_TIMEOUT_DEFAULT_MS);
+ RpcResponse rpcResponse =
transportClient.sendRpcSync(getMemoryShuffleDataRequest, rpcTimeout);
GetMemoryShuffleDataResponse getMemoryShuffleDataResponse =
(GetMemoryShuffleDataResponse) rpcResponse;
StatusCode statusCode = rpcResponse.getStatusCode();
@@ -231,8 +246,7 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
request.getPartitionNumPerRange(),
request.getPartitionNum());
long start = System.currentTimeMillis();
- RpcResponse rpcResponse =
- transportClient.sendRpcSync(getLocalShuffleIndexRequest,
RPC_TIMEOUT_DEFAULT_MS);
+ RpcResponse rpcResponse =
transportClient.sendRpcSync(getLocalShuffleIndexRequest, rpcTimeout);
String requestInfo =
"appId["
+ request.getAppId()
@@ -285,8 +299,7 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
request.getLength(),
System.currentTimeMillis());
long start = System.currentTimeMillis();
- RpcResponse rpcResponse =
- transportClient.sendRpcSync(getLocalShuffleIndexRequest,
RPC_TIMEOUT_DEFAULT_MS);
+ RpcResponse rpcResponse =
transportClient.sendRpcSync(getLocalShuffleIndexRequest, rpcTimeout);
String requestInfo =
"appId["
+ request.getAppId()