This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 4d0f9f4f48 refactor: inspect protocol classes (#10444)
4d0f9f4f48 is described below
commit 4d0f9f4f4833317bfde6d73126e5dc386208ebdc
Author: stone lion <[email protected]>
AuthorDate: Fri Aug 12 14:38:15 2022 +0800
refactor: inspect protocol classes (#10444)
---
.../dubbo/rpc/protocol/dubbo/DubboProtocol.java | 69 ++++++++--------------
.../protocol/dubbo/LazyConnectExchangeClient.java | 2 +-
.../dubbo/ReferenceCountExchangeClient.java | 6 +-
3 files changed, 28 insertions(+), 49 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
index 2d35784a05..b2ccfbe2ac 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
@@ -48,10 +48,12 @@ import org.apache.dubbo.rpc.protocol.AbstractProtocol;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -406,35 +408,28 @@ public class DubboProtocol extends AbstractProtocol {
}
private ExchangeClient[] getClients(URL url) {
- // whether to share connection
-
- boolean useShareConnect = false;
-
int connections = url.getParameter(CONNECTIONS_KEY, 0);
- List<ReferenceCountExchangeClient> shareClients = null;
+ // whether to share connection
// if not configured, connection is shared, otherwise, one connection
for one service
if (connections == 0) {
- useShareConnect = true;
-
/*
* The xml configuration should have a higher priority than
properties.
*/
- String shareConnectionsStr =
url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
- connections =
Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ?
ConfigurationUtils.getProperty(url.getOrDefaultApplicationModel(),
SHARE_CONNECTIONS_KEY,
- DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
- shareClients = getSharedClient(url, connections);
+ String shareConnectionsStr =
StringUtils.isBlank(url.getParameter(SHARE_CONNECTIONS_KEY, (String) null))
+ ?
ConfigurationUtils.getProperty(url.getOrDefaultApplicationModel(),
SHARE_CONNECTIONS_KEY, DEFAULT_SHARE_CONNECTIONS)
+ : url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
+ connections = Integer.parseInt(shareConnectionsStr);
+
+ List<ReferenceCountExchangeClient> shareClients =
getSharedClient(url, connections);
+ ExchangeClient[] clients = new ExchangeClient[connections];
+ Arrays.setAll(clients, shareClients::get);
+ return clients;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
- if (useShareConnect) {
- clients[i] = shareClients.get(i);
-
- } else {
- clients[i] = initClient(url);
- }
+ clients[i] = initClient(url);
}
-
return clients;
}
@@ -461,6 +456,7 @@ public class DubboProtocol extends AbstractProtocol {
synchronized (referenceClientMap) {
for (; ; ) {
+ // guarantee just one thread in loading condition. And Other
is waiting It had finished.
clients = referenceClientMap.get(key);
if (clients instanceof List) {
@@ -513,7 +509,6 @@ public class DubboProtocol extends AbstractProtocol {
}
}
return typedClients;
-
}
/**
@@ -527,14 +522,10 @@ public class DubboProtocol extends AbstractProtocol {
return false;
}
- for (ReferenceCountExchangeClient referenceCountExchangeClient :
referenceCountExchangeClients) {
- // As long as one client is not available, you need to replace the
unavailable client with the available one.
- if (referenceCountExchangeClient == null ||
referenceCountExchangeClient.getCount() <= 0 ||
referenceCountExchangeClient.isClosed()) {
- return false;
- }
- }
-
- return true;
+ // As long as one client is not available, you need to replace the
unavailable client with the available one.
+ return referenceCountExchangeClients.stream()
+ .noneMatch(referenceCountExchangeClient ->
referenceCountExchangeClient == null
+ || referenceCountExchangeClient.getCount() <= 0 ||
referenceCountExchangeClient.isClosed());
}
/**
@@ -546,12 +537,9 @@ public class DubboProtocol extends AbstractProtocol {
if (CollectionUtils.isEmpty(referenceCountExchangeClients)) {
return;
}
-
- for (ReferenceCountExchangeClient referenceCountExchangeClient :
referenceCountExchangeClients) {
- if (referenceCountExchangeClient != null) {
- referenceCountExchangeClient.incrementAndGetCount();
- }
- }
+ referenceCountExchangeClients.stream()
+ .filter(Objects::nonNull)
+ .forEach(ReferenceCountExchangeClient::incrementAndGetCount);
}
/**
@@ -592,8 +580,7 @@ public class DubboProtocol extends AbstractProtocol {
* @param url
*/
private ExchangeClient initClient(URL url) {
-
- /**
+ /*
* Instance of url is InstanceAddressURL, so addParameter actually
adds parameters into ServiceInstance,
* which means params are shared among different services. Since
client is shared among services this is currently not a problem.
*/
@@ -605,7 +592,6 @@ public class DubboProtocol extends AbstractProtocol {
" supported client type is " +
StringUtils.join(url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions(),
" "));
}
- ExchangeClient client;
try {
// Replace InstanceAddressURL with ServiceConfigURL.
url = new ServiceConfigURL(DubboCodec.NAME, url.getUsername(),
url.getPassword(), url.getHost(), url.getPort(), url.getPath(),
url.getAllParameters());
@@ -614,17 +600,12 @@ public class DubboProtocol extends AbstractProtocol {
url = url.addParameterIfAbsent(HEARTBEAT_KEY,
String.valueOf(DEFAULT_HEARTBEAT));
// connection should be lazy
- if (url.getParameter(LAZY_CONNECT_KEY, false)) {
- client = new LazyConnectExchangeClient(url, requestHandler);
- } else {
- client = Exchangers.connect(url, requestHandler);
- }
-
+ return url.getParameter(LAZY_CONNECT_KEY, false)
+ ? new LazyConnectExchangeClient(url, requestHandler)
+ : Exchangers.connect(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for
service(" + url + "): " + e.getMessage(), e);
}
-
- return client;
}
@Override
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
index fae3bf7295..7e2d5332c8 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
@@ -51,7 +51,7 @@ final class LazyConnectExchangeClient implements
ExchangeClient {
private final URL url;
private final ExchangeHandler requestHandler;
private final Lock connectLock = new ReentrantLock();
- private final int warningPeriod = 5000;
+ private static final int warningPeriod = 5000;
private final boolean needReconnect;
private volatile ExchangeClient client;
private final AtomicLong warningCount = new AtomicLong(0);
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
index 2a904b00d7..b0a8f08e2f 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
@@ -43,7 +43,7 @@ final class ReferenceCountExchangeClient implements
ExchangeClient {
private final URL url;
private final AtomicInteger referenceCount = new AtomicInteger(0);
private final AtomicInteger disconnectCount = new AtomicInteger(0);
- private final Integer warningPeriod = 50;
+ private static final Integer warningPeriod = 50;
private ExchangeClient client;
private int shutdownWaitTime = DEFAULT_SERVER_SHUTDOWN_TIMEOUT;
@@ -202,9 +202,7 @@ final class ReferenceCountExchangeClient implements
ExchangeClient {
logger.warn(url.getAddress() + " " + url.getServiceKey() + " safe
guard client , should not be called ,must have a bug.");
}
- /**
- * the order of judgment in the if statement cannot be changed.
- */
+ // the order of judgment in the if statement cannot be changed.
if (!(client instanceof LazyConnectExchangeClient)) {
client = new LazyConnectExchangeClient(url,
client.getExchangeHandler());
}