This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 10d9625a8eb3ecb2f61293dff029c411539ae4af
Author: walter <[email protected]>
AuthorDate: Mon Oct 23 11:32:24 2023 +0800

    [fix](rpc) Rebuild failed channel to avoid connection refused (#25688)
---
 .../org/apache/doris/rpc/BackendServiceClient.java    |  9 +++++++++
 .../org/apache/doris/rpc/BackendServiceProxy.java     | 19 +++++++++++++++++--
 2 files changed, 26 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index 8df33927fd1..e4ece51146b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -27,6 +27,7 @@ import io.grpc.CallOptions;
 import io.grpc.Channel;
 import io.grpc.ClientCall;
 import io.grpc.ClientInterceptor;
+import io.grpc.ConnectivityState;
 import io.grpc.ForwardingClientCall;
 import io.grpc.ManagedChannel;
 import io.grpc.Metadata;
@@ -64,6 +65,14 @@ public class BackendServiceClient {
         execPlanTimeout = Config.remote_fragment_exec_timeout_ms + 5000;
     }
 
+    // Is the underlying channel in a normal state? (That means the RPC call 
will not fail immediately)
+    public boolean isNormalState() {
+        ConnectivityState state = channel.getState(false);
+        return state == ConnectivityState.CONNECTING
+            || state == ConnectivityState.IDLE
+            || state == ConnectivityState.READY;
+    }
+
     public Future<InternalService.PExecPlanFragmentResult> 
execPlanFragmentAsync(
             InternalService.PExecPlanFragmentRequest request) {
         return stub.withDeadlineAfter(execPlanTimeout, TimeUnit.MILLISECONDS)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 5fc3ef2815c..9b5c491df69 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -109,18 +109,30 @@ public class BackendServiceProxy {
     private BackendServiceClient getProxy(TNetworkAddress address) throws 
UnknownHostException {
         String realIp = NetUtils.getIpByHost(address.getHostname());
         BackendServiceClientExtIp serviceClientExtIp = serviceMap.get(address);
-        if (serviceClientExtIp != null && 
serviceClientExtIp.realIp.equals(realIp)) {
+        if (serviceClientExtIp != null && 
serviceClientExtIp.realIp.equals(realIp)
+                && serviceClientExtIp.client.isNormalState()) {
             return serviceClientExtIp.client;
         }
+
         // not exist, create one and return.
+        BackendServiceClient removedClient = null;
         lock.lock();
         try {
             serviceClientExtIp = serviceMap.get(address);
             if (serviceClientExtIp != null && 
!serviceClientExtIp.realIp.equals(realIp)) {
                 LOG.warn("Cached ip changed ,before ip: {}, curIp: {}", 
serviceClientExtIp.realIp, realIp);
                 serviceMap.remove(address);
+                removedClient = serviceClientExtIp.client;
+                serviceClientExtIp = null;
+            }
+            if (serviceClientExtIp != null && 
!serviceClientExtIp.client.isNormalState()) {
+                // At this point we cannot judge the progress of reconnecting 
the underlying channel.
+                // In the worst case, it may take two minutes. But we can't 
stand the connection refused
+                // for two minutes, so rebuild the channel directly.
+                serviceMap.remove(address);
+                removedClient = serviceClientExtIp.client;
+                serviceClientExtIp = null;
             }
-            serviceClientExtIp = serviceMap.get(address);
             if (serviceClientExtIp == null) {
                 BackendServiceClient client = new 
BackendServiceClient(address, grpcThreadPool);
                 serviceMap.put(address, new BackendServiceClientExtIp(realIp, 
client));
@@ -128,6 +140,9 @@ public class BackendServiceProxy {
             return serviceMap.get(address).client;
         } finally {
             lock.unlock();
+            if (removedClient != null) {
+                removedClient.shutdown();
+            }
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to