This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 47ec659  remove wrapper AsyncToSyncInvoker
47ec659 is described below

commit 47ec6598b880a50a8c750e31152f5ef5cfb9b12f
Author: ken.lj <[email protected]>
AuthorDate: Tue Nov 24 14:45:51 2020 +0800

    remove wrapper AsyncToSyncInvoker
---
 .../apache/dubbo/rpc/protocol/AbstractInvoker.java |  36 ++++++++
 .../dubbo/rpc/protocol/AbstractProtocol.java       |   3 +-
 .../dubbo/rpc/protocol/AsyncToSyncInvoker.java     | 102 ---------------------
 .../rpc/protocol/dubbo/CallbackServiceCodec.java   |   3 +-
 .../dubbo/rpc/protocol/dubbo/DubboProtocol.java    |   5 +
 .../protocol/dubbo/DubboInvokerAvilableTest.java   |   6 +-
 .../dubbo/ReferenceCountExchangeClientTest.java    |   3 +-
 7 files changed, 48 insertions(+), 110 deletions(-)

diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
index f8a5ac9..737a4ad 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
@@ -26,6 +26,8 @@ import 
org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
 import org.apache.dubbo.common.utils.ArrayUtils;
 import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.TimeoutException;
 import org.apache.dubbo.rpc.AsyncRpcResult;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.InvokeMode;
@@ -41,7 +43,9 @@ import java.lang.reflect.InvocationTargetException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * This Invoker works on Consumer side.
@@ -178,9 +182,41 @@ public abstract class AbstractInvoker<T> implements 
Invoker<T> {
             asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, 
invocation);
         }
         RpcContext.getContext().setFuture(new 
FutureAdapter(asyncResult.getResponseFuture()));
+
+        waitForResultIfSync(asyncResult, invocation);
         return asyncResult;
     }
 
+    private void waitForResultIfSync(AsyncRpcResult asyncResult, RpcInvocation 
invocation) {
+        try {
+            if (InvokeMode.SYNC == invocation.getInvokeMode()) {
+                /**
+                 * NOTICE!
+                 * must call {@link 
java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
+                 * {@link java.util.concurrent.CompletableFuture#get()} was 
proved to have serious performance drop.
+                 */
+                asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
+            }
+        } catch (InterruptedException e) {
+            throw new RpcException("Interrupted unexpectedly while waiting for 
remote result to return!  method: " +
+                    invocation.getMethodName() + ", provider: " + getUrl() + 
", cause: " + e.getMessage(), e);
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof TimeoutException) {
+                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke 
remote method timeout. method: " +
+                        invocation.getMethodName() + ", provider: " + getUrl() 
+ ", cause: " + e.getMessage(), e);
+            } else if (t instanceof RemotingException) {
+                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed 
to invoke remote method: " +
+                        invocation.getMethodName() + ", provider: " + getUrl() 
+ ", cause: " + e.getMessage(), e);
+            } else {
+                throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail 
to invoke remote method: " +
+                        invocation.getMethodName() + ", provider: " + getUrl() 
+ ", cause: " + e.getMessage(), e);
+            }
+        } catch (Throwable e) {
+            throw new RpcException(e.getMessage(), e);
+        }
+    }
+
     protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
         ExecutorService sharedExecutor = 
ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);
         if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProtocol.java
 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProtocol.java
index 903cd27..cb77c68 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProtocol.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractProtocol.java
@@ -98,9 +98,10 @@ public abstract class AbstractProtocol implements Protocol {
 
     @Override
     public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
-        return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
+        return protocolBindingRefer(type, url);
     }
 
+    @Deprecated
     protected abstract <T> Invoker<T> protocolBindingRefer(Class<T> type, URL 
url) throws RpcException;
 
     public Map<String, Exporter<?>> getExporterMap() {
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java
 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java
deleted file mode 100644
index 69a3a28..0000000
--- 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.rpc.protocol;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.remoting.RemotingException;
-import org.apache.dubbo.remoting.TimeoutException;
-import org.apache.dubbo.rpc.Invocation;
-import org.apache.dubbo.rpc.InvokeMode;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.Result;
-import org.apache.dubbo.rpc.RpcException;
-import org.apache.dubbo.rpc.RpcInvocation;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-/**
- * This class will work as a wrapper wrapping outside of each protocol invoker.
- *
- * @param <T>
- */
-public class AsyncToSyncInvoker<T> implements Invoker<T> {
-
-    private Invoker<T> invoker;
-
-    public AsyncToSyncInvoker(Invoker<T> invoker) {
-        this.invoker = invoker;
-    }
-
-    @Override
-    public Class<T> getInterface() {
-        return invoker.getInterface();
-    }
-
-    @Override
-    public Result invoke(Invocation invocation) throws RpcException {
-        Result asyncResult = invoker.invoke(invocation);
-
-        try {
-            if (InvokeMode.SYNC == ((RpcInvocation) 
invocation).getInvokeMode()) {
-                /**
-                 * NOTICE!
-                 * must call {@link 
java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
-                 * {@link java.util.concurrent.CompletableFuture#get()} was 
proved to have serious performance drop.
-                 */
-                asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
-            }
-        } catch (InterruptedException e) {
-            throw new RpcException("Interrupted unexpectedly while waiting for 
remote result to return!  method: " +
-                    invocation.getMethodName() + ", provider: " + getUrl() + 
", cause: " + e.getMessage(), e);
-        } catch (ExecutionException e) {
-            Throwable t = e.getCause();
-            if (t instanceof TimeoutException) {
-                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke 
remote method timeout. method: " +
-                        invocation.getMethodName() + ", provider: " + getUrl() 
+ ", cause: " + e.getMessage(), e);
-            } else if (t instanceof RemotingException) {
-                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed 
to invoke remote method: " +
-                        invocation.getMethodName() + ", provider: " + getUrl() 
+ ", cause: " + e.getMessage(), e);
-            } else {
-                throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail 
to invoke remote method: " +
-                        invocation.getMethodName() + ", provider: " + getUrl() 
+ ", cause: " + e.getMessage(), e);
-            }
-        } catch (Throwable e) {
-            throw new RpcException(e.getMessage(), e);
-        }
-        return asyncResult;
-    }
-
-    @Override
-    public URL getUrl() {
-        return invoker.getUrl();
-    }
-
-    @Override
-    public boolean isAvailable() {
-        return invoker.isAvailable();
-    }
-
-    @Override
-    public void destroy() {
-        invoker.destroy();
-    }
-
-    public Invoker<T> getInvoker() {
-        return invoker;
-    }
-}
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java
index 607f291..e2634f1 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java
@@ -32,7 +32,6 @@ import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.ProxyFactory;
 import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.model.ApplicationModel;
-import org.apache.dubbo.rpc.protocol.AsyncToSyncInvoker;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -168,7 +167,7 @@ class CallbackServiceCodec {
                     
ApplicationModel.getServiceRepository().registerService(clazz);
                     @SuppressWarnings("rawtypes")
                     Invoker<?> invoker = new ChannelWrappedInvoker(clazz, 
channel, referurl, String.valueOf(instid));
-                    proxy = PROXY_FACTORY.getProxy(new 
AsyncToSyncInvoker<>(invoker));
+                    proxy = PROXY_FACTORY.getProxy(invoker);
                     channel.setAttribute(proxyCacheKey, proxy);
                     channel.setAttribute(invokerCacheKey, invoker);
                     increaseInstanceCount(channel, countkey);
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
index 0c9204a..20dd018 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
@@ -397,6 +397,11 @@ public class DubboProtocol extends AbstractProtocol {
     }
 
     @Override
+    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
+        return protocolBindingRefer(type, url);
+    }
+
+    @Override
     public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) 
throws RpcException {
         optimizeSerialization(url);
 
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvokerAvilableTest.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvokerAvilableTest.java
index d2c318b..a3f1af3 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvokerAvilableTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvokerAvilableTest.java
@@ -23,8 +23,8 @@ import org.apache.dubbo.common.utils.NetUtils;
 import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.remoting.exchange.ExchangeClient;
 import org.apache.dubbo.rpc.Exporter;
+import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.ProxyFactory;
-import org.apache.dubbo.rpc.protocol.AsyncToSyncInvoker;
 import org.apache.dubbo.rpc.protocol.dubbo.support.ProtocolUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -133,10 +133,10 @@ public class DubboInvokerAvilableTest {
         URL url = URL.valueOf("dubbo://127.0.0.1:" + port + 
"/org.apache.dubbo.rpc.protocol.dubbo.IDemoService?lazy=true&connections=1&timeout=10000");
         ProtocolUtils.export(new DemoServiceImpl(), IDemoService.class, url);
 
-        AsyncToSyncInvoker<?> invoker = (AsyncToSyncInvoker) 
protocol.refer(IDemoService.class, url);
+        Invoker<?> invoker = protocol.refer(IDemoService.class, url);
         Assertions.assertTrue(invoker.isAvailable());
 
-        ExchangeClient exchangeClient = getClients((DubboInvoker<?>) 
invoker.getInvoker())[0];
+        ExchangeClient exchangeClient = getClients((DubboInvoker<?>) 
invoker)[0];
         Assertions.assertFalse(exchangeClient.isClosed());
         try {
             
exchangeClient.setAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY, 
Boolean.TRUE);
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java
index ce82068..3341192 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClientTest.java
@@ -25,7 +25,6 @@ import org.apache.dubbo.remoting.exchange.ExchangeClient;
 import org.apache.dubbo.rpc.Exporter;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.ProxyFactory;
-import org.apache.dubbo.rpc.protocol.AsyncToSyncInvoker;
 import org.apache.dubbo.rpc.protocol.dubbo.support.ProtocolUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -275,7 +274,7 @@ public class ReferenceCountExchangeClientTest {
     }
 
     private List<ExchangeClient> getInvokerClientList(Invoker<?> invoker) {
-        @SuppressWarnings("rawtypes") DubboInvoker dInvoker = (DubboInvoker) 
((AsyncToSyncInvoker) invoker).getInvoker();
+        @SuppressWarnings("rawtypes") DubboInvoker dInvoker = (DubboInvoker) 
invoker;
         try {
             Field clientField = DubboInvoker.class.getDeclaredField("clients");
             clientField.setAccessible(true);

Reply via email to