This is an automated email from the ASF dual-hosted git repository. xianjingfeng pushed a commit to branch branch-0.8 in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
commit 547f61fe4d22362637c3d25c3f77f6322a5371a0 Author: xumanbu <[email protected]> AuthorDate: Fri Sep 1 16:57:32 2023 +0800 [#1175] fix(netty): Retry failed with StacklessClosedChannelException after channel closed (#1181) ### What changes were proposed in this pull request? 1、reget a transportClient in ShuffleServerGrpcNettyClient.sendShuffleData RetryCmd 2、add ShuffleServerGrpcNettyClient construct with maxRetryAttempts (but in this pr init ShuffleServerGrpcNettyClient with default maxRetryAttempts(3)) ### Why are the changes needed? Fix: #1175 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UT. --------- Co-authored-by: jam.xu <[email protected]> (cherry picked from commit 9622df2d9e46e534e8f5d935f334fdbf761cd12e) --- .../uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java index 1a867fa22..2fe15aae7 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java @@ -58,7 +58,12 @@ public class ShuffleServerGrpcNettyClient extends ShuffleServerGrpcClient { private TransportClientFactory clientFactory; public ShuffleServerGrpcNettyClient(RssConf rssConf, String host, int grpcPort, int nettyPort) { - super(host, grpcPort); + this(rssConf, host, grpcPort, nettyPort, 3); + } + + public ShuffleServerGrpcNettyClient( + RssConf rssConf, String host, int grpcPort, int nettyPort, int maxRetryAttempts) { + super(host, grpcPort, maxRetryAttempts); this.nettyPort = nettyPort; TransportContext transportContext = new TransportContext(new TransportConf(rssConf)); this.clientFactory = new TransportClientFactory(transportContext); @@ -66,7 +71,6 @@ public class ShuffleServerGrpcNettyClient extends ShuffleServerGrpcClient { @Override public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest request) { - TransportClient transportClient = getTransportClient(); Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks = request.getShuffleIdToBlocks(); boolean isSuccessful = true; @@ -88,6 +92,7 @@ public class ShuffleServerGrpcNettyClient extends ShuffleServerGrpcClient { try { RetryUtils.retry( () -> { + TransportClient transportClient = getTransportClient(); long requireId = requirePreAllocation( request.getAppId(),
