This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 469a0b3c63 refactor: inspect code of protocol classes. (#10390)
469a0b3c63 is described below
commit 469a0b3c63a3c4e37f0bd62707b1fc32bc0ed30c
Author: stone lion <[email protected]>
AuthorDate: Mon Aug 1 14:33:02 2022 +0800
refactor: inspect code of protocol classes. (#10390)
* refactor: inspect code of protocol classes.
* refactor: inspect code of protocol classes.
---
.../java/org/apache/dubbo/rpc/RpcInvocation.java | 2 +-
.../dubbo/rpc/proxy/InvokerInvocationHandler.java | 14 ++--
.../dubbo/rpc/protocol/dubbo/DubboProtocol.java | 90 ++++++++++------------
3 files changed, 50 insertions(+), 56 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java
index f5ec9fd064..156b8172d5 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java
@@ -77,7 +77,7 @@ public class RpcInvocation implements Invocation,
Serializable {
*/
private Map<String, Object> attachments;
- private transient Lock attachmentLock = new ReentrantLock();
+ private final transient Lock attachmentLock = new ReentrantLock();
/**
* Only used on the caller side, will not appear on the wire.
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java
index ea75da5f22..255a5ad089 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java
@@ -33,16 +33,18 @@ import java.lang.reflect.Method;
*/
public class InvokerInvocationHandler implements InvocationHandler {
private static final Logger logger =
LoggerFactory.getLogger(InvokerInvocationHandler.class);
+
private final Invoker<?> invoker;
- private ServiceModel serviceModel;
- private URL url;
- private String protocolServiceKey;
+
+ private final ServiceModel serviceModel;
+
+ private final String protocolServiceKey;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
- this.url = invoker.getUrl();
- this.protocolServiceKey = this.url.getProtocolServiceKey();
- this.serviceModel = this.url.getServiceModel();
+ URL url = invoker.getUrl();
+ this.protocolServiceKey = url.getProtocolServiceKey();
+ this.serviceModel = url.getServiceModel();
}
@Override
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
index dcff5f4692..2d35784a05 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
@@ -52,7 +52,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -92,26 +91,28 @@ public class DubboProtocol extends AbstractProtocol {
public static final String NAME = "dubbo";
public static final int DEFAULT_PORT = 20880;
+
private static final String IS_CALLBACK_SERVICE_INVOKE =
"_isCallBackServiceInvoke";
/**
* <host:port,Exchanger>
- * {@link Map<String, List<ReferenceCountExchangeClient>}
+ * Map<String, List<ReferenceCountExchangeClient>
*/
private final Map<String, Object> referenceClientMap = new
ConcurrentHashMap<>();
+
private static final Object PENDING_OBJECT = new Object();
- private AtomicBoolean destroyed = new AtomicBoolean();
+ private final AtomicBoolean destroyed = new AtomicBoolean();
- private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
+ private final ExchangeHandler requestHandler = new
ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object
message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
- + (message == null ? null :
(message.getClass().getName() + ": " + message))
- + ", channel: consumer: " + channel.getRemoteAddress()
+ " --> provider: " + channel.getLocalAddress());
+ + (message == null ? null : (message.getClass().getName()
+ ": " + message))
+ + ", channel: consumer: " + channel.getRemoteAddress() + "
--> provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
@@ -138,9 +139,9 @@ public class DubboProtocol extends AbstractProtocol {
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " +
inv.getMethodName()
- + " not found in callback service interface
,invoke will be ignored."
- + " please update the api interface. url is:"
- + invoker.getUrl()) + " ,invocation is :" + inv);
+ + " not found in callback service interface ,invoke
will be ignored."
+ + " please update the api interface. url is:"
+ + invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
@@ -243,17 +244,12 @@ public class DubboProtocol extends AbstractProtocol {
return (DubboProtocol)
scopeModel.getExtensionLoader(Protocol.class).getExtension(DubboProtocol.NAME,
false);
}
- @Override
- public Collection<Exporter<?>> getExporters() {
- return Collections.unmodifiableCollection(exporterMap.values());
- }
-
private boolean isClientSide(Channel channel) {
InetSocketAddress address = channel.getRemoteAddress();
URL url = channel.getUrl();
return url.getPort() == address.getPort() &&
- NetUtils.filterLocalHost(channel.getUrl().getIp())
-
.equals(NetUtils.filterLocalHost(address.getAddress().getHostAddress()));
+ NetUtils.filterLocalHost(channel.getUrl().getIp())
+
.equals(NetUtils.filterLocalHost(address.getAddress().getHostAddress()));
}
Invoker<?> getInvoker(Channel channel, Invocation inv) throws
RemotingException {
@@ -277,16 +273,16 @@ public class DubboProtocol extends AbstractProtocol {
}
String serviceKey = serviceKey(
- port,
- path,
- (String) inv.getObjectAttachmentWithoutConvert(VERSION_KEY),
- (String) inv.getObjectAttachmentWithoutConvert(GROUP_KEY)
+ port,
+ path,
+ (String) inv.getObjectAttachmentWithoutConvert(VERSION_KEY),
+ (String) inv.getObjectAttachmentWithoutConvert(GROUP_KEY)
);
DubboExporter<?> exporter = (DubboExporter<?>)
exporterMap.get(serviceKey);
if (exporter == null) {
throw new RemotingException(channel, "Not found exported service:
" + serviceKey + " in " + exporterMap.keySet() + ", may be version or group
mismatch " +
- ", channel: consumer: " + channel.getRemoteAddress() + "
--> provider: " + channel.getLocalAddress() + ", message:" +
getInvocationWithoutData(inv));
+ ", channel: consumer: " + channel.getRemoteAddress() + " -->
provider: " + channel.getLocalAddress() + ", message:" +
getInvocationWithoutData(inv));
}
return exporter.getInvoker();
@@ -311,14 +307,14 @@ public class DubboProtocol extends AbstractProtocol {
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key,
exporterMap);
//export a stub service for dispatching event
- Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY,
DEFAULT_STUB_EVENT);
- Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE,
false);
- if (isStubSupportEvent && !isCallbackservice) {
+ boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY,
DEFAULT_STUB_EVENT);
+ boolean isCallbackService = url.getParameter(IS_CALLBACK_SERVICE,
false);
+ if (isStubSupportEvent && !isCallbackService) {
String stubServiceMethods =
url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() ==
0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" +
url.getParameter(INTERFACE_KEY) +
- "], has set stubproxy support event ,but no stub
methods founded."));
+ "], has set stub proxy support event ,but no stub
methods founded."));
}
}
@@ -343,14 +339,13 @@ public class DubboProtocol extends AbstractProtocol {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
- }else {
- server.reset(url);
+ return;
}
}
- } else {
- // server supports reset, use together with override
- server.reset(url);
}
+
+ // server supports reset, use together with override
+ server.reset(url);
}
}
@@ -362,16 +357,16 @@ public class DubboProtocol extends AbstractProtocol {
private ProtocolServer createServer(URL url) {
url = URLBuilder.from(url)
- // send readonly event when server closes, it's enabled by
default
- .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY,
Boolean.TRUE.toString())
- // enable heartbeat by default
- .addParameterIfAbsent(HEARTBEAT_KEY,
String.valueOf(DEFAULT_HEARTBEAT))
- .addParameter(CODEC_KEY, DubboCodec.NAME)
- .build();
- String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
+ // send readonly event when server closes, it's enabled by default
+ .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY,
Boolean.TRUE.toString())
+ // enable heartbeat by default
+ .addParameterIfAbsent(HEARTBEAT_KEY,
String.valueOf(DEFAULT_HEARTBEAT))
+ .addParameter(CODEC_KEY, DubboCodec.NAME)
+ .build();
- if (StringUtils.isNotEmpty(str) &&
!url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str))
{
- throw new RpcException("Unsupported server type: " + str + ", url:
" + url);
+ String transporter = url.getParameter(SERVER_KEY,
DEFAULT_REMOTING_SERVER);
+ if (StringUtils.isNotEmpty(transporter) &&
!url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(transporter))
{
+ throw new RpcException("Unsupported server type: " + transporter +
", url: " + url);
}
ExchangeServer server;
@@ -381,12 +376,9 @@ public class DubboProtocol extends AbstractProtocol {
throw new RpcException("Fail to start server(url: " + url + ") " +
e.getMessage(), e);
}
- str = url.getParameter(CLIENT_KEY);
- if (StringUtils.isNotEmpty(str)) {
- Set<String> supportedTypes =
url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions();
- if (!supportedTypes.contains(str)) {
- throw new RpcException("Unsupported client type: " + str);
- }
+ transporter = url.getParameter(CLIENT_KEY);
+ if (StringUtils.isNotEmpty(transporter) &&
!url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(transporter))
{
+ throw new RpcException("Unsupported client type: " + transporter);
}
DubboProtocolServer protocolServer = new DubboProtocolServer(server);
@@ -429,7 +421,7 @@ public class DubboProtocol extends AbstractProtocol {
*/
String shareConnectionsStr =
url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections =
Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ?
ConfigurationUtils.getProperty(url.getOrDefaultApplicationModel(),
SHARE_CONNECTIONS_KEY,
- DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
+ DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
shareClients = getSharedClient(url, connections);
}
@@ -610,13 +602,13 @@ public class DubboProtocol extends AbstractProtocol {
// BIO is not allowed since it has severe performance issue.
if (StringUtils.isNotEmpty(str) &&
!url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str))
{
throw new RpcException("Unsupported client type: " + str + "," +
- " supported client type is " +
StringUtils.join(url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions(),
" "));
+ " supported client type is " +
StringUtils.join(url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions(),
" "));
}
ExchangeClient client;
try {
// Replace InstanceAddressURL with ServiceConfigURL.
- url = new ServiceConfigURL(DubboCodec.NAME, url.getUsername(),
url.getPassword(), url.getHost(), url.getPort(), url.getPath(),
url.getAllParameters());
+ url = new ServiceConfigURL(DubboCodec.NAME, url.getUsername(),
url.getPassword(), url.getHost(), url.getPort(), url.getPath(),
url.getAllParameters());
url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
url = url.addParameterIfAbsent(HEARTBEAT_KEY,
String.valueOf(DEFAULT_HEARTBEAT));
@@ -661,7 +653,7 @@ public class DubboProtocol extends AbstractProtocol {
server.close(getServerShutdownTimeout(protocolServer));
} catch (Throwable t) {
- logger.warn("Close dubbo server [" + server.getLocalAddress()+
"] failed: " + t.getMessage(), t);
+ logger.warn("Close dubbo server [" + server.getLocalAddress()
+ "] failed: " + t.getMessage(), t);
}
}
serverMap.clear();