This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 9622df2d9 [#1175] fix(netty): Retry failed with 
StacklessClosedChannelException after channel closed (#1181)
9622df2d9 is described below

commit 9622df2d9e46e534e8f5d935f334fdbf761cd12e
Author: xumanbu <[email protected]>
AuthorDate: Fri Sep 1 16:57:32 2023 +0800

    [#1175] fix(netty): Retry failed with StacklessClosedChannelException after 
channel closed (#1181)
    
    ### What changes were proposed in this pull request?
    
    1、reget a transportClient in ShuffleServerGrpcNettyClient.sendShuffleData 
RetryCmd
    2、add ShuffleServerGrpcNettyClient construct with maxRetryAttempts (but in 
this pr init ShuffleServerGrpcNettyClient with default maxRetryAttempts(3))
    
    ### Why are the changes needed?
    Fix: #1175
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing UT.
    
    ---------
    
    Co-authored-by: jam.xu <[email protected]>
---
 .../uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java   | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index 1a867fa22..2fe15aae7 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -58,7 +58,12 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
   private TransportClientFactory clientFactory;
 
   public ShuffleServerGrpcNettyClient(RssConf rssConf, String host, int 
grpcPort, int nettyPort) {
-    super(host, grpcPort);
+    this(rssConf, host, grpcPort, nettyPort, 3);
+  }
+
+  public ShuffleServerGrpcNettyClient(
+      RssConf rssConf, String host, int grpcPort, int nettyPort, int 
maxRetryAttempts) {
+    super(host, grpcPort, maxRetryAttempts);
     this.nettyPort = nettyPort;
     TransportContext transportContext = new TransportContext(new 
TransportConf(rssConf));
     this.clientFactory = new TransportClientFactory(transportContext);
@@ -66,7 +71,6 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
 
   @Override
   public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest 
request) {
-    TransportClient transportClient = getTransportClient();
     Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks =
         request.getShuffleIdToBlocks();
     boolean isSuccessful = true;
@@ -88,6 +92,7 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
       try {
         RetryUtils.retry(
             () -> {
+              TransportClient transportClient = getTransportClient();
               long requireId =
                   requirePreAllocation(
                       request.getAppId(),

Reply via email to