This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 9622df2d9 [#1175] fix(netty): Retry failed with
StacklessClosedChannelException after channel closed (#1181)
9622df2d9 is described below
commit 9622df2d9e46e534e8f5d935f334fdbf761cd12e
Author: xumanbu <[email protected]>
AuthorDate: Fri Sep 1 16:57:32 2023 +0800
[#1175] fix(netty): Retry failed with StacklessClosedChannelException after
channel closed (#1181)
### What changes were proposed in this pull request?
1、reget a transportClient in ShuffleServerGrpcNettyClient.sendShuffleData
RetryCmd
2、add ShuffleServerGrpcNettyClient construct with maxRetryAttempts (but in
this pr init ShuffleServerGrpcNettyClient with default maxRetryAttempts(3))
### Why are the changes needed?
Fix: #1175
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UT.
---------
Co-authored-by: jam.xu <[email protected]>
---
.../uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index 1a867fa22..2fe15aae7 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -58,7 +58,12 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
private TransportClientFactory clientFactory;
public ShuffleServerGrpcNettyClient(RssConf rssConf, String host, int
grpcPort, int nettyPort) {
- super(host, grpcPort);
+ this(rssConf, host, grpcPort, nettyPort, 3);
+ }
+
+ public ShuffleServerGrpcNettyClient(
+ RssConf rssConf, String host, int grpcPort, int nettyPort, int
maxRetryAttempts) {
+ super(host, grpcPort, maxRetryAttempts);
this.nettyPort = nettyPort;
TransportContext transportContext = new TransportContext(new
TransportConf(rssConf));
this.clientFactory = new TransportClientFactory(transportContext);
@@ -66,7 +71,6 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
@Override
public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest
request) {
- TransportClient transportClient = getTransportClient();
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks =
request.getShuffleIdToBlocks();
boolean isSuccessful = true;
@@ -88,6 +92,7 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
try {
RetryUtils.retry(
() -> {
+ TransportClient transportClient = getTransportClient();
long requireId =
requirePreAllocation(
request.getAppId(),