This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 4d0f9f4f48 refactor: inspect protocol classes (#10444)
4d0f9f4f48 is described below

commit 4d0f9f4f4833317bfde6d73126e5dc386208ebdc
Author: stone lion <[email protected]>
AuthorDate: Fri Aug 12 14:38:15 2022 +0800

    refactor: inspect protocol classes (#10444)
---
 .../dubbo/rpc/protocol/dubbo/DubboProtocol.java    | 69 ++++++++--------------
 .../protocol/dubbo/LazyConnectExchangeClient.java  |  2 +-
 .../dubbo/ReferenceCountExchangeClient.java        |  6 +-
 3 files changed, 28 insertions(+), 49 deletions(-)

diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
index 2d35784a05..b2ccfbe2ac 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
@@ -48,10 +48,12 @@ import org.apache.dubbo.rpc.protocol.AbstractProtocol;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -406,35 +408,28 @@ public class DubboProtocol extends AbstractProtocol {
     }
 
     private ExchangeClient[] getClients(URL url) {
-        // whether to share connection
-
-        boolean useShareConnect = false;
-
         int connections = url.getParameter(CONNECTIONS_KEY, 0);
-        List<ReferenceCountExchangeClient> shareClients = null;
+        // whether to share connection
         // if not configured, connection is shared, otherwise, one connection 
for one service
         if (connections == 0) {
-            useShareConnect = true;
-
             /*
              * The xml configuration should have a higher priority than 
properties.
              */
-            String shareConnectionsStr = 
url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
-            connections = 
Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? 
ConfigurationUtils.getProperty(url.getOrDefaultApplicationModel(), 
SHARE_CONNECTIONS_KEY,
-                DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
-            shareClients = getSharedClient(url, connections);
+            String shareConnectionsStr = 
StringUtils.isBlank(url.getParameter(SHARE_CONNECTIONS_KEY, (String) null))
+                ? 
ConfigurationUtils.getProperty(url.getOrDefaultApplicationModel(), 
SHARE_CONNECTIONS_KEY, DEFAULT_SHARE_CONNECTIONS)
+                : url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
+            connections = Integer.parseInt(shareConnectionsStr);
+
+            List<ReferenceCountExchangeClient> shareClients = 
getSharedClient(url, connections);
+            ExchangeClient[] clients = new ExchangeClient[connections];
+            Arrays.setAll(clients, shareClients::get);
+            return clients;
         }
 
         ExchangeClient[] clients = new ExchangeClient[connections];
         for (int i = 0; i < clients.length; i++) {
-            if (useShareConnect) {
-                clients[i] = shareClients.get(i);
-
-            } else {
-                clients[i] = initClient(url);
-            }
+            clients[i] = initClient(url);
         }
-
         return clients;
     }
 
@@ -461,6 +456,7 @@ public class DubboProtocol extends AbstractProtocol {
 
         synchronized (referenceClientMap) {
             for (; ; ) {
+                // guarantee just one thread in loading condition. And Other 
is waiting It had finished.
                 clients = referenceClientMap.get(key);
 
                 if (clients instanceof List) {
@@ -513,7 +509,6 @@ public class DubboProtocol extends AbstractProtocol {
             }
         }
         return typedClients;
-
     }
 
     /**
@@ -527,14 +522,10 @@ public class DubboProtocol extends AbstractProtocol {
             return false;
         }
 
-        for (ReferenceCountExchangeClient referenceCountExchangeClient : 
referenceCountExchangeClients) {
-            // As long as one client is not available, you need to replace the 
unavailable client with the available one.
-            if (referenceCountExchangeClient == null || 
referenceCountExchangeClient.getCount() <= 0 || 
referenceCountExchangeClient.isClosed()) {
-                return false;
-            }
-        }
-
-        return true;
+        // As long as one client is not available, you need to replace the 
unavailable client with the available one.
+        return referenceCountExchangeClients.stream()
+            .noneMatch(referenceCountExchangeClient -> 
referenceCountExchangeClient == null
+                || referenceCountExchangeClient.getCount() <= 0 || 
referenceCountExchangeClient.isClosed());
     }
 
     /**
@@ -546,12 +537,9 @@ public class DubboProtocol extends AbstractProtocol {
         if (CollectionUtils.isEmpty(referenceCountExchangeClients)) {
             return;
         }
-
-        for (ReferenceCountExchangeClient referenceCountExchangeClient : 
referenceCountExchangeClients) {
-            if (referenceCountExchangeClient != null) {
-                referenceCountExchangeClient.incrementAndGetCount();
-            }
-        }
+        referenceCountExchangeClients.stream()
+            .filter(Objects::nonNull)
+            .forEach(ReferenceCountExchangeClient::incrementAndGetCount);
     }
 
     /**
@@ -592,8 +580,7 @@ public class DubboProtocol extends AbstractProtocol {
      * @param url
      */
     private ExchangeClient initClient(URL url) {
-
-        /**
+        /*
          * Instance of url is InstanceAddressURL, so addParameter actually 
adds parameters into ServiceInstance,
          * which means params are shared among different services. Since 
client is shared among services this is currently not a problem.
          */
@@ -605,7 +592,6 @@ public class DubboProtocol extends AbstractProtocol {
                 " supported client type is " + 
StringUtils.join(url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions(),
 " "));
         }
 
-        ExchangeClient client;
         try {
             // Replace InstanceAddressURL with ServiceConfigURL.
             url = new ServiceConfigURL(DubboCodec.NAME, url.getUsername(), 
url.getPassword(), url.getHost(), url.getPort(), url.getPath(), 
url.getAllParameters());
@@ -614,17 +600,12 @@ public class DubboProtocol extends AbstractProtocol {
             url = url.addParameterIfAbsent(HEARTBEAT_KEY, 
String.valueOf(DEFAULT_HEARTBEAT));
 
             // connection should be lazy
-            if (url.getParameter(LAZY_CONNECT_KEY, false)) {
-                client = new LazyConnectExchangeClient(url, requestHandler);
-            } else {
-                client = Exchangers.connect(url, requestHandler);
-            }
-
+            return url.getParameter(LAZY_CONNECT_KEY, false)
+                ? new LazyConnectExchangeClient(url, requestHandler)
+                : Exchangers.connect(url, requestHandler);
         } catch (RemotingException e) {
             throw new RpcException("Fail to create remoting client for 
service(" + url + "): " + e.getMessage(), e);
         }
-
-        return client;
     }
 
     @Override
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
index fae3bf7295..7e2d5332c8 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
@@ -51,7 +51,7 @@ final class LazyConnectExchangeClient implements 
ExchangeClient {
     private final URL url;
     private final ExchangeHandler requestHandler;
     private final Lock connectLock = new ReentrantLock();
-    private final int warningPeriod = 5000;
+    private static final int warningPeriod = 5000;
     private final boolean needReconnect;
     private volatile ExchangeClient client;
     private final AtomicLong warningCount = new AtomicLong(0);
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
index 2a904b00d7..b0a8f08e2f 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
@@ -43,7 +43,7 @@ final class ReferenceCountExchangeClient implements 
ExchangeClient {
     private final URL url;
     private final AtomicInteger referenceCount = new AtomicInteger(0);
     private final AtomicInteger disconnectCount = new AtomicInteger(0);
-    private final Integer warningPeriod = 50;
+    private static final Integer warningPeriod = 50;
     private ExchangeClient client;
     private int shutdownWaitTime = DEFAULT_SERVER_SHUTDOWN_TIMEOUT;
 
@@ -202,9 +202,7 @@ final class ReferenceCountExchangeClient implements 
ExchangeClient {
             logger.warn(url.getAddress() + " " + url.getServiceKey() + " safe 
guard client , should not be called ,must have a bug.");
         }
 
-        /**
-         * the order of judgment in the if statement cannot be changed.
-         */
+        // the order of judgment in the if statement cannot be changed.
         if (!(client instanceof LazyConnectExchangeClient)) {
             client = new LazyConnectExchangeClient(url, 
client.getExchangeHandler());
         }

Reply via email to