This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 47ec659 remove wrapper AsyncToSyncInvoker
47ec659 is described below
commit 47ec6598b880a50a8c750e31152f5ef5cfb9b12f
Author: ken.lj <[email protected]>
AuthorDate: Tue Nov 24 14:45:51 2020 +0800
remove wrapper AsyncToSyncInvoker
---
.../apache/dubbo/rpc/protocol/AbstractInvoker.java | 36 ++++++++
.../dubbo/rpc/protocol/AbstractProtocol.java | 3 +-
.../dubbo/rpc/protocol/AsyncToSyncInvoker.java | 102 ---------------------
.../rpc/protocol/dubbo/CallbackServiceCodec.java | 3 +-
.../dubbo/rpc/protocol/dubbo/DubboProtocol.java | 5 +
.../protocol/dubbo/DubboInvokerAvilableTest.java | 6 +-
.../dubbo/ReferenceCountExchangeClientTest.java | 3 +-
7 files changed, 48 insertions(+), 110 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
index f8a5ac9..737a4ad 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
@@ -26,6 +26,8 @@ import
org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.utils.ArrayUtils;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.TimeoutException;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.InvokeMode;
@@ -41,7 +43,9 @@ import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* This Invoker works on Consumer side.
@@ -178,9 +182,41 @@ public abstract class AbstractInvoker<T> implements
Invoker<T> {
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e,
invocation);
}
RpcContext.getContext().setFuture(new
FutureAdapter(asyncResult.getResponseFuture()));
+
+ waitForResultIfSync(asyncResult, invocation);
return asyncResult;
}
+ private void waitForResultIfSync(AsyncRpcResult asyncResult, RpcInvocation
invocation) {
+ try {
+ if (InvokeMode.SYNC == invocation.getInvokeMode()) {
+ /**
+ * NOTICE!
+ * must call {@link
java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
+ * {@link java.util.concurrent.CompletableFuture#get()} was
proved to have serious performance drop.
+ */
+ asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
+ }
+ } catch (InterruptedException e) {
+ throw new RpcException("Interrupted unexpectedly while waiting for
remote result to return! method: " +
+ invocation.getMethodName() + ", provider: " + getUrl() +
", cause: " + e.getMessage(), e);
+ } catch (ExecutionException e) {
+ Throwable t = e.getCause();
+ if (t instanceof TimeoutException) {
+ throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke
remote method timeout. method: " +
+ invocation.getMethodName() + ", provider: " + getUrl()
+ ", cause: " + e.getMessage(), e);
+ } else if (t instanceof RemotingException) {
+ throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed
to invoke remote method: " +
+ invocation.getMethodName() + ", provider: " + getUrl()
+ ", cause: " + e.getMessage(), e);
+ } else {
+ throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail
to invoke remote method: " +
+ invocation.getMethodName() + ", provider: " + getUrl()
+ ", cause: " + e.getMessage(), e);
+ }
+ } catch (Throwable e) {
+ throw new RpcException(e.getMessage(), e);
+ }
+ }
+
protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
ExecutorService sharedExecutor =
ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);
if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProtocol.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProtocol.java
index 903cd27..cb77c68 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProtocol.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProtocol.java
@@ -98,9 +98,10 @@ public abstract class AbstractProtocol implements Protocol {
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
- return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
+ return protocolBindingRefer(type, url);
}
+ @Deprecated
protected abstract <T> Invoker<T> protocolBindingRefer(Class<T> type, URL
url) throws RpcException;
public Map<String, Exporter<?>> getExporterMap() {
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java
deleted file mode 100644
index 69a3a28..0000000
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.remoting.RemotingException;
-import org.apache.dubbo.remoting.TimeoutException;
-import org.apache.dubbo.rpc.Invocation;
-import org.apache.dubbo.rpc.InvokeMode;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.Result;
-import org.apache.dubbo.rpc.RpcException;
-import org.apache.dubbo.rpc.RpcInvocation;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-/**
- * This class will work as a wrapper wrapping outside of each protocol invoker.
- *
- * @param <T>
- */
-public class AsyncToSyncInvoker<T> implements Invoker<T> {
-
- private Invoker<T> invoker;
-
- public AsyncToSyncInvoker(Invoker<T> invoker) {
- this.invoker = invoker;
- }
-
- @Override
- public Class<T> getInterface() {
- return invoker.getInterface();
- }
-
- @Override
- public Result invoke(Invocation invocation) throws RpcException {
- Result asyncResult = invoker.invoke(invocation);
-
- try {
- if (InvokeMode.SYNC == ((RpcInvocation)
invocation).getInvokeMode()) {
- /**
- * NOTICE!
- * must call {@link
java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
- * {@link java.util.concurrent.CompletableFuture#get()} was
proved to have serious performance drop.
- */
- asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
- }
- } catch (InterruptedException e) {
- throw new RpcException("Interrupted unexpectedly while waiting for
remote result to return! method: " +
- invocation.getMethodName() + ", provider: " + getUrl() +
", cause: " + e.getMessage(), e);
- } catch (ExecutionException e) {
- Throwable t = e.getCause();
- if (t instanceof TimeoutException) {
- throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke
remote method timeout. method: " +
- invocation.getMethodName() + ", provider: " + getUrl()
+ ", cause: " + e.getMessage(), e);
- } else if (t instanceof RemotingException) {
- throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed
to invoke remote method: " +
- invocation.getMethodName() + ", provider: " + getUrl()
+ ", cause: " + e.getMessage(), e);
- } else {
- throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail
to invoke remote method: " +
- invocation.getMethodName() + ", provider: " + getUrl()
+ ", cause: " + e.getMessage(), e);
- }
- } catch (Throwable e) {
- throw new RpcException(e.getMessage(), e);
- }
- return asyncResult;
- }
-
- @Override
- public URL getUrl() {
- return invoker.getUrl();
- }
-
- @Override
- public boolean isAvailable() {
- return invoker.isAvailable();
- }
-
- @Override
- public void destroy() {
- invoker.destroy();
- }
-
- public Invoker<T> getInvoker() {
- return invoker;
- }
-}
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java
index 607f291..e2634f1 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java
@@ -32,7 +32,6 @@ import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.ProxyFactory;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ApplicationModel;
-import org.apache.dubbo.rpc.protocol.AsyncToSyncInvoker;
import java.io.IOException;
import java.util.HashMap;
@@ -168,7 +167,7 @@ class CallbackServiceCodec {
ApplicationModel.getServiceRepository().registerService(clazz);
@SuppressWarnings("rawtypes")
Invoker<?> invoker = new ChannelWrappedInvoker(clazz,
channel, referurl, String.valueOf(instid));
- proxy = PROXY_FACTORY.getProxy(new
AsyncToSyncInvoker<>(invoker));
+ proxy = PROXY_FACTORY.getProxy(invoker);
channel.setAttribute(proxyCacheKey, proxy);
channel.setAttribute(invokerCacheKey, invoker);
increaseInstanceCount(channel, countkey);
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
index 0c9204a..20dd018 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
@@ -397,6 +397,11 @@ public class DubboProtocol extends AbstractProtocol {
}
@Override
+ public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
+ return protocolBindingRefer(type, url);
+ }
+
+ @Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url)
throws RpcException {
optimizeSerialization(url);
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvokerAvilableTest.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvokerAvilableTest.java
index d2c318b..a3f1af3 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvokerAvilableTest.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvokerAvilableTest.java
@@ -23,8 +23,8 @@ import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.exchange.ExchangeClient;
import org.apache.dubbo.rpc.Exporter;
+import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.ProxyFactory;
-import org.apache.dubbo.rpc.protocol.AsyncToSyncInvoker;
import org.apache.dubbo.rpc.protocol.dubbo.support.ProtocolUtils;
import org.junit.jupiter.api.AfterAll;
@@ -133,10 +133,10 @@ public class DubboInvokerAvilableTest {
URL url = URL.valueOf("dubbo://127.0.0.1:" + port +
"/org.apache.dubbo.rpc.protocol.dubbo.IDemoService?lazy=true&connections=1&timeout=10000");
ProtocolUtils.export(new DemoServiceImpl(), IDemoService.class, url);
- AsyncToSyncInvoker<?> invoker = (AsyncToSyncInvoker)
protocol.refer(IDemoService.class, url);
+ Invoker<?> invoker = protocol.refer(IDemoService.class, url);
Assertions.assertTrue(invoker.isAvailable());
- ExchangeClient exchangeClient = getClients((DubboInvoker<?>)
invoker.getInvoker())[0];
+ ExchangeClient exchangeClient = getClients((DubboInvoker<?>)
invoker)[0];
Assertions.assertFalse(exchangeClient.isClosed());
try {
exchangeClient.setAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY,
Boolean.TRUE);
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java
index ce82068..3341192 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java
@@ -25,7 +25,6 @@ import org.apache.dubbo.remoting.exchange.ExchangeClient;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.ProxyFactory;
-import org.apache.dubbo.rpc.protocol.AsyncToSyncInvoker;
import org.apache.dubbo.rpc.protocol.dubbo.support.ProtocolUtils;
import org.junit.jupiter.api.AfterAll;
@@ -275,7 +274,7 @@ public class ReferenceCountExchangeClientTest {
}
private List<ExchangeClient> getInvokerClientList(Invoker<?> invoker) {
- @SuppressWarnings("rawtypes") DubboInvoker dInvoker = (DubboInvoker)
((AsyncToSyncInvoker) invoker).getInvoker();
+ @SuppressWarnings("rawtypes") DubboInvoker dInvoker = (DubboInvoker)
invoker;
try {
Field clientField = DubboInvoker.class.getDeclaredField("clients");
clientField.setAccessible(true);