This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch performance-tuning-2.7.x
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/performance-tuning-2.7.x by 
this push:
     new b2dbc5e  DefaultFuture (#4257)
b2dbc5e is described below

commit b2dbc5ef5da92e3ee5eba06d64a8e00446dd65c7
Author: ken.lj <ken.lj...@gmail.com>
AuthorDate: Wed Jun 5 18:41:40 2019 +0800

    DefaultFuture (#4257)
---
 .../dubbo/demo/provider/DemoServiceImpl.java       | 10 ------
 .../apache/dubbo/registry/dubbo/MockChannel.java   |  8 ++---
 .../apache/dubbo/registry/dubbo/MockedClient.java  | 14 ++++-----
 .../dubbo/remoting/exchange/ExchangeChannel.java   |  8 ++---
 .../remoting/exchange/support/DefaultFuture.java   | 36 +++++++++++++---------
 .../support/header/HeaderExchangeChannel.java      | 18 +++++------
 .../support/header/HeaderExchangeClient.java       | 20 ++++++------
 .../test/java/org/apache/dubbo/remoting/Main.java  | 10 +++---
 .../dubbo/remoting/PerformanceClientFixedTest.java |  5 ++-
 .../dubbo/remoting/PerformanceClientTest.java      |  9 ++++--
 .../exchange/support/DefaultFutureTest.java        |  9 ++++--
 .../support/header/HeaderExchangeChannelTest.java  |  9 +++---
 .../transport/mina/ClientToServerTest.java         |  2 +-
 .../transport/netty/ClientToServerTest.java        |  2 +-
 .../transport/netty4/ClientToServerTest.java       |  2 +-
 .../rpc/protocol/dubbo/ChannelWrappedInvoker.java  | 11 +------
 .../dubbo/rpc/protocol/dubbo/DubboInvoker.java     |  4 +--
 .../protocol/dubbo/LazyConnectExchangeClient.java  | 18 +++++------
 .../dubbo/ReferenceCountExchangeClient.java        | 16 +++++-----
 .../dubbo/rpc/protocol/thrift/ThriftInvoker.java   |  4 +--
 .../dubbo/rpc/protocol/thrift/ThriftCodecTest.java |  5 +--
 21 files changed, 108 insertions(+), 112 deletions(-)

diff --git 
a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java
 
b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java
index 5e57f77..bceeae2 100644
--- 
a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java
+++ 
b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java
@@ -30,22 +30,12 @@ public class DemoServiceImpl implements DemoService {
     @Override
     public String sayHello(String name) {
         logger.info("Hello " + name + ", request from consumer: " + 
RpcContext.getContext().getRemoteAddress());
-        try {
-            Thread.sleep(10000);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
         return "Hello " + name + ", response from provider: " + 
RpcContext.getContext().getLocalAddress();
     }
 
     @Override
     public CompletableFuture<String> sayHelloAsync(String name) {
         CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
             return "async result";
         });
         return cf;
diff --git 
a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java
 
b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java
index e563da6..9c43359 100644
--- 
a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java
+++ 
b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java
@@ -86,21 +86,21 @@ public class MockChannel implements ExchangeChannel {
         return null;
     }
 
-    public CompletableFuture<Object> request(Object request) throws 
RemotingException {
+    public CompletableFuture<Object> request(Object request, CompletableFuture 
completableFuture) throws RemotingException {
         return null;
     }
 
-    public CompletableFuture<Object> request(Object request, int timeout) 
throws RemotingException {
+    public CompletableFuture<Object> request(Object request, int timeout, 
CompletableFuture completableFuture) throws RemotingException {
         return null;
     }
 
     @Override
-    public CompletableFuture<Object> request(Object request, ExecutorService 
executor) throws RemotingException {
+    public CompletableFuture<Object> request(Object request, ExecutorService 
executor, CompletableFuture completableFuture) throws RemotingException {
         return null;
     }
 
     @Override
-    public CompletableFuture<Object> request(Object request, int timeout, 
ExecutorService executor) throws RemotingException {
+    public CompletableFuture<Object> request(Object request, int timeout, 
ExecutorService executor, CompletableFuture completableFuture) throws 
RemotingException {
         return null;
     }
 
diff --git 
a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java
 
b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java
index ad06cd2..bd62d3a 100644
--- 
a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java
+++ 
b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java
@@ -81,21 +81,21 @@ public class MockedClient implements ExchangeClient {
         this.sent = msg;
     }
 
-    public CompletableFuture<Object> request(Object msg) throws 
RemotingException {
-        return request(msg, null);
+    public CompletableFuture<Object> request(Object msg, CompletableFuture 
completableFuture) throws RemotingException {
+        return request(msg, null, completableFuture);
     }
 
-    public CompletableFuture<Object> request(Object msg, int timeout) throws 
RemotingException {
-        return this.request(msg, timeout, null);
+    public CompletableFuture<Object> request(Object msg, int timeout, 
CompletableFuture completableFuture) throws RemotingException {
+        return this.request(msg, timeout, null, completableFuture);
     }
 
     @Override
-    public CompletableFuture<Object> request(Object msg, ExecutorService 
executor) throws RemotingException {
-        return this.request(msg, 0, executor);
+    public CompletableFuture<Object> request(Object msg, ExecutorService 
executor, CompletableFuture completableFuture) throws RemotingException {
+        return this.request(msg, 0, executor, completableFuture);
     }
 
     @Override
-    public CompletableFuture<Object> request(Object msg, int timeout, 
ExecutorService executor) throws RemotingException {
+    public CompletableFuture<Object> request(Object msg, int timeout, 
ExecutorService executor, CompletableFuture completableFuture) throws 
RemotingException {
         this.invoked = msg;
         return new CompletableFuture<Object>() {
             public Object get() throws InterruptedException, 
ExecutionException {
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java
index c0cf131..48a9f6f 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java
@@ -35,7 +35,7 @@ public interface ExchangeChannel extends Channel {
      * @throws RemotingException
      */
     @Deprecated
-    CompletableFuture<Object> request(Object request) throws RemotingException;
+    CompletableFuture<Object> request(Object request, CompletableFuture 
completableFuture) throws RemotingException;
 
     /**
      * send request.
@@ -46,7 +46,7 @@ public interface ExchangeChannel extends Channel {
      * @throws RemotingException
      */
     @Deprecated
-    CompletableFuture<Object> request(Object request, int timeout) throws 
RemotingException;
+    CompletableFuture<Object> request(Object request, int timeout, 
CompletableFuture completableFuture) throws RemotingException;
 
     /**
      * send request.
@@ -55,7 +55,7 @@ public interface ExchangeChannel extends Channel {
      * @return response future
      * @throws RemotingException
      */
-    CompletableFuture<Object> request(Object request, ExecutorService 
executor) throws RemotingException;
+    CompletableFuture<Object> request(Object request, ExecutorService 
executor, CompletableFuture completableFuture) throws RemotingException;
 
     /**
      * send request.
@@ -65,7 +65,7 @@ public interface ExchangeChannel extends Channel {
      * @return response future
      * @throws RemotingException
      */
-    CompletableFuture<Object> request(Object request, int timeout, 
ExecutorService executor) throws RemotingException;
+    CompletableFuture<Object> request(Object request, int timeout, 
ExecutorService executor, CompletableFuture completableFuture) throws 
RemotingException;
 
     /**
      * get message handler.
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
index 0abecf2..a23e8cd 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
@@ -35,6 +35,7 @@ import java.util.Date;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -44,7 +45,7 @@ import static 
org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
 /**
  * DefaultFuture.
  */
-public class DefaultFuture extends CompletableFuture<Object> {
+public class DefaultFuture {
 
     private static final Logger logger = 
LoggerFactory.getLogger(DefaultFuture.class);
 
@@ -52,6 +53,9 @@ public class DefaultFuture extends CompletableFuture<Object> {
 
     private static final Map<Long, DefaultFuture> FUTURES = new 
ConcurrentHashMap<>();
 
+    //    private static final Map<Long, Timeout> PENDING_TASKS = new 
ConcurrentHashMap<>();
+    private final CompletableFuture completableFuture;
+
     public static final Timer TIME_OUT_TIMER = new HashedWheelTimer(
             new NamedThreadFactory("dubbo-future-timeout", true),
             30,
@@ -66,20 +70,18 @@ public class DefaultFuture extends 
CompletableFuture<Object> {
     private volatile long sent;
     private Timeout timeoutCheckTask;
 
-    private ExecutorService executor;
+    private final ExecutorService executor;
 
     public ExecutorService getExecutor() {
         return executor;
     }
 
-    public void setExecutor(ExecutorService executor) {
-        this.executor = executor;
-    }
-
-    private DefaultFuture(Channel channel, Request request, int timeout) {
+    private DefaultFuture(Channel channel, Request request, int timeout, 
ExecutorService executor, CompletableFuture completableFuture) {
         this.channel = channel;
         this.request = request;
         this.id = request.getId();
+        this.executor = executor;
+        this.completableFuture = completableFuture;
         this.timeout = timeout > 0 ? timeout : 
channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
         // put into waiting map.
         FUTURES.put(id, this);
@@ -104,9 +106,8 @@ public class DefaultFuture extends 
CompletableFuture<Object> {
      * @param timeout timeout
      * @return a new DefaultFuture
      */
-    public static DefaultFuture newFuture(Channel channel, Request request, 
int timeout, ExecutorService executor) {
-        final DefaultFuture future = new DefaultFuture(channel, request, 
timeout);
-        future.setExecutor(executor);
+    public static DefaultFuture newFuture(Channel channel, Request request, 
int timeout, ExecutorService executor, CompletableFuture completableFuture) {
+        final DefaultFuture future = new DefaultFuture(channel, request, 
timeout, executor, completableFuture);
         // timeout check
         timeoutCheck(future);
         return future;
@@ -176,7 +177,6 @@ public class DefaultFuture extends 
CompletableFuture<Object> {
         }
     }
 
-    @Override
     public boolean cancel(boolean mayInterruptIfRunning) {
         Response errorResult = new Response(id);
         errorResult.setStatus(Response.CLIENT_ERROR);
@@ -187,6 +187,14 @@ public class DefaultFuture extends 
CompletableFuture<Object> {
         return true;
     }
 
+    public boolean isDone() {
+        return this.completableFuture.isDone();
+    }
+
+    public Object get() throws ExecutionException, InterruptedException {
+        return this.completableFuture.get();
+    }
+
     public void cancel() {
         this.cancel(true);
     }
@@ -196,11 +204,11 @@ public class DefaultFuture extends 
CompletableFuture<Object> {
             throw new IllegalStateException("response cannot be null");
         }
         if (res.getStatus() == Response.OK) {
-            this.complete(res.getResult());
+            this.completableFuture.complete(res.getResult());
         } else if (res.getStatus() == Response.CLIENT_TIMEOUT || 
res.getStatus() == Response.SERVER_TIMEOUT) {
-            this.completeExceptionally(new TimeoutException(res.getStatus() == 
Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
+            this.completableFuture.completeExceptionally(new 
TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, 
res.getErrorMessage()));
         } else {
-            this.completeExceptionally(new RemotingException(channel, 
res.getErrorMessage()));
+            this.completableFuture.completeExceptionally(new 
RemotingException(channel, res.getErrorMessage()));
         }
 
         // the result is returning, but the caller thread may still waiting
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
index b954612..a82bf53 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
@@ -106,22 +106,22 @@ final class HeaderExchangeChannel implements 
ExchangeChannel {
     }
 
     @Override
-    public CompletableFuture<Object> request(Object request) throws 
RemotingException {
-        return request(request, null);
+    public CompletableFuture<Object> request(Object request, CompletableFuture 
completableFuture) throws RemotingException {
+        return request(request, null, completableFuture);
     }
 
     @Override
-    public CompletableFuture<Object> request(Object request, int timeout) 
throws RemotingException {
-        return request(request, timeout, null);
+    public CompletableFuture<Object> request(Object request, int timeout, 
CompletableFuture completableFuture) throws RemotingException {
+        return request(request, timeout, null, completableFuture);
     }
 
     @Override
-    public CompletableFuture<Object> request(Object request, ExecutorService 
executor) throws RemotingException {
-        return request(request, 
channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), executor);
+    public CompletableFuture<Object> request(Object request, ExecutorService 
executor, CompletableFuture completableFuture) throws RemotingException {
+        return request(request, 
channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), executor, 
completableFuture);
     }
 
     @Override
-    public CompletableFuture<Object> request(Object request, int timeout, 
ExecutorService executor) throws RemotingException {
+    public CompletableFuture<Object> request(Object request, int timeout, 
ExecutorService executor, CompletableFuture completableFuture) throws 
RemotingException {
         if (closed) {
             throw new RemotingException(this.getLocalAddress(), null, "Failed 
to send request " + request + ", cause: The channel " + this + " is closed!");
         }
@@ -130,14 +130,14 @@ final class HeaderExchangeChannel implements 
ExchangeChannel {
         req.setVersion(Version.getProtocolVersion());
         req.setTwoWay(true);
         req.setData(request);
-        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, 
executor);
+        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, 
executor, completableFuture);
         try {
             channel.send(req);
         } catch (RemotingException e) {
             future.cancel();
             throw e;
         }
-        return future;
+        return completableFuture;
     }
 
     @Override
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
index 6671d1a..8e04b99 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
@@ -34,11 +34,11 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.dubbo.remoting.utils.UrlUtils.getHeartbeat;
-import static org.apache.dubbo.remoting.utils.UrlUtils.getIdleTimeout;
 import static org.apache.dubbo.remoting.Constants.HEARTBEAT_CHECK_TICK;
 import static org.apache.dubbo.remoting.Constants.LEAST_HEARTBEAT_DURATION;
 import static org.apache.dubbo.remoting.Constants.TICKS_PER_WHEEL;
+import static org.apache.dubbo.remoting.utils.UrlUtils.getHeartbeat;
+import static org.apache.dubbo.remoting.utils.UrlUtils.getIdleTimeout;
 
 /**
  * DefaultMessageClient
@@ -66,8 +66,8 @@ public class HeaderExchangeClient implements ExchangeClient {
     }
 
     @Override
-    public CompletableFuture<Object> request(Object request) throws 
RemotingException {
-        return channel.request(request);
+    public CompletableFuture<Object> request(Object request, CompletableFuture 
completableFuture) throws RemotingException {
+        return channel.request(request, completableFuture);
     }
 
     @Override
@@ -81,18 +81,18 @@ public class HeaderExchangeClient implements ExchangeClient 
{
     }
 
     @Override
-    public CompletableFuture<Object> request(Object request, int timeout) 
throws RemotingException {
-        return channel.request(request, timeout);
+    public CompletableFuture<Object> request(Object request, int timeout, 
CompletableFuture completableFuture) throws RemotingException {
+        return channel.request(request, timeout, completableFuture);
     }
 
     @Override
-    public CompletableFuture<Object> request(Object request, ExecutorService 
executor) throws RemotingException {
-        return channel.request(request, executor);
+    public CompletableFuture<Object> request(Object request, ExecutorService 
executor, CompletableFuture completableFuture) throws RemotingException {
+        return channel.request(request, executor, completableFuture);
     }
 
     @Override
-    public CompletableFuture<Object> request(Object request, int timeout, 
ExecutorService executor) throws RemotingException {
-        return channel.request(request, timeout, executor);
+    public CompletableFuture<Object> request(Object request, int timeout, 
ExecutorService executor, CompletableFuture completableFuture) throws 
RemotingException {
+        return channel.request(request, timeout, executor, completableFuture);
     }
 
     @Override
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/Main.java
 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/Main.java
index 4478f86..9869bcc 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/Main.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/Main.java
@@ -62,7 +62,7 @@ public class Main {
                 sb.append("(" + random.nextLong() + ")");
             Main.Data d = new Main.Data();
             d.setData(sb.toString());
-            client.request(d).get();
+            client.request(d, new CompletableFuture()).get();
         }
         System.out.println("send finished.");
     }
@@ -83,18 +83,18 @@ public class Main {
 
     private static void test(int port) throws Exception {
         ExchangeChannel client = 
Exchangers.connect(URL.valueOf("dubbo://localhost:" + port));
-        MockResult result = (MockResult) client.request(new 
RpcMessage(DemoService.class.getName(), "plus", new Class<?>[]{int.class, 
int.class}, new Object[]{55, 25})).get();
+        MockResult result = (MockResult) client.request(new 
RpcMessage(DemoService.class.getName(), "plus", new Class<?>[]{int.class, 
int.class}, new Object[]{55, 25}), new CompletableFuture()).get();
         System.out.println("55+25=" + result.getResult());
 
         for (int i = 0; i < 100; i++)
-            client.request(new RpcMessage(DemoService.class.getName(), 
"sayHello", new Class<?>[]{String.class}, new Object[]{"qianlei" + i}));
+            client.request(new RpcMessage(DemoService.class.getName(), 
"sayHello", new Class<?>[]{String.class}, new Object[]{"qianlei" + i}), new 
CompletableFuture());
 
         for (int i = 0; i < 100; i++)
-            client.request(new Main.Data());
+            client.request(new Main.Data(), new CompletableFuture());
 
         System.out.println("=====test invoke=====");
         for (int i = 0; i < 100; i++) {
-            CompletableFuture<Object> future = client.request(new Main.Data());
+            CompletableFuture<Object> future = client.request(new Main.Data(), 
new CompletableFuture());
             System.out.println("invoke and get");
             System.out.println("invoke result:" + future.get());
         }
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientFixedTest.java
 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientFixedTest.java
index 93ccc76..ab0228f 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientFixedTest.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientFixedTest.java
@@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Random;
+import java.util.concurrent.CompletableFuture;
 
 import static 
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
 import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
@@ -120,7 +121,9 @@ public class PerformanceClientFixedTest  {
                 int index = rd.nextInt(connectionCount);
                 ExchangeClient client = arrays.get(index);
                 // ExchangeClient client = arrays.get(0);
-                String output = (String) client.request(messageBlock).get();
+                CompletableFuture cf = new CompletableFuture();
+                client.request(messageBlock, cf);
+                String output = (String) cf.get();
 
                 if (output.lastIndexOf(messageBlock) < 0) {
                     System.out.println("send messageBlock;get " + output);
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientTest.java
 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientTest.java
index f4dfbc2..b86b17d 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientTest.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientTest.java
@@ -28,6 +28,7 @@ import java.text.DecimalFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -72,8 +73,9 @@ public class PerformanceClientTest  {
             exchangeClients[i] = Exchangers.connect(url);
         }
 
-        List<String> serverEnvironment = (List<String>) 
exchangeClients[0].request("environment").get();
-        List<String> serverScene = (List<String>) 
exchangeClients[0].request("scene").get();
+        CompletableFuture<Object> completableFuture = new 
CompletableFuture<>();
+        List<String> serverEnvironment = (List<String>) 
exchangeClients[0].request("environment", completableFuture).get();
+        List<String> serverScene = (List<String>) 
exchangeClients[0].request("scene", completableFuture).get();
 
         // Create some data for test
         StringBuilder buf = new StringBuilder(length);
@@ -101,7 +103,8 @@ public class PerformanceClientTest  {
                                 count.incrementAndGet();
                                 ExchangeClient client = 
exchangeClients[index.getAndIncrement() % connections];
                                 long start = System.currentTimeMillis();
-                                String result = (String) 
client.request(data).get();
+                                CompletableFuture<Object> completableFuture = 
new CompletableFuture<>();
+                                String result = (String) client.request(data, 
completableFuture).get();
                                 long end = System.currentTimeMillis();
                                 if (!data.equals(result)) {
                                     throw new IllegalStateException("Invalid 
result " + result);
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
index 0f19d15..01db770 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
@@ -27,6 +27,7 @@ import org.junit.jupiter.api.Test;
 
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class DefaultFutureTest {
@@ -95,7 +96,8 @@ public class DefaultFutureTest {
         // timeout after 5 seconds.
         Channel channel = new MockedChannel();
         Request request = new Request(10);
-        DefaultFuture f = DefaultFuture.newFuture(channel, request, 5000, 
null);
+        CompletableFuture cf = new CompletableFuture();
+        DefaultFuture f = DefaultFuture.newFuture(channel, request, 5000, 
null, cf);
         //mark the future is sent
         DefaultFuture.sent(channel, request);
         while (!f.isDone()) {
@@ -106,7 +108,7 @@ public class DefaultFutureTest {
 
         // get operate will throw a timeout exception, because the future is 
timeout.
         try {
-            f.get();
+            cf.get();
         } catch (Exception e) {
             Assertions.assertTrue(e.getCause() instanceof TimeoutException, 
"catch exception is not timeout exception!");
             System.out.println(e.getMessage());
@@ -119,7 +121,8 @@ public class DefaultFutureTest {
     private DefaultFuture defaultFuture(int timeout) {
         Channel channel = new MockedChannel();
         Request request = new Request(index.getAndIncrement());
-        return DefaultFuture.newFuture(channel, request, timeout, null);
+        CompletableFuture<Object> completableFuture = new 
CompletableFuture<>();
+        return DefaultFuture.newFuture(channel, request, timeout, null, 
completableFuture);
     }
 
 }
\ No newline at end of file
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java
 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java
index 2affe7d..873540d 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java
@@ -29,6 +29,7 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -143,7 +144,7 @@ public class HeaderExchangeChannelTest {
         Assertions.assertThrows(RemotingException.class, () -> {
             header.close(1000);
             Object requestob = new Object();
-            header.request(requestob);
+            header.request(requestob, CompletableFuture.completedFuture(0));
         });
     }
 
@@ -153,7 +154,7 @@ public class HeaderExchangeChannelTest {
         header = new HeaderExchangeChannel(channel);
         when(channel.getUrl()).thenReturn(url);
         Object requestob = new Object();
-        header.request(requestob);
+        header.request(requestob, CompletableFuture.completedFuture(0));
         ArgumentCaptor<Request> argumentCaptor = 
ArgumentCaptor.forClass(Request.class);
         verify(channel, times(1)).send(argumentCaptor.capture());
         Assertions.assertEquals(argumentCaptor.getValue().getData(), 
requestob);
@@ -170,7 +171,7 @@ public class HeaderExchangeChannelTest {
             };
             header = new HeaderExchangeChannel(channel);
             Object requestob = new Object();
-            header.request(requestob, 1000);
+            header.request(requestob, 1000, 
CompletableFuture.completedFuture(0));
         });
     }
 
@@ -191,7 +192,7 @@ public class HeaderExchangeChannelTest {
     public void closeWithTimeoutTest02() {
         Assertions.assertFalse(channel.isClosed());
         Request request = new Request();
-        DefaultFuture.newFuture(channel, request, 100, null);
+        DefaultFuture.newFuture(channel, request, 100, null, 
CompletableFuture.completedFuture(0));
         header.close(100);
         //return directly
         header.close(1000);
diff --git 
a/dubbo-remoting/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientToServerTest.java
 
b/dubbo-remoting/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientToServerTest.java
index 6414434..bea1860 100644
--- 
a/dubbo-remoting/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientToServerTest.java
+++ 
b/dubbo-remoting/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientToServerTest.java
@@ -65,7 +65,7 @@ public abstract class ClientToServerTest {
 
     @Test
     public void testFuture() throws Exception {
-        CompletableFuture<Object> future = client.request(new World("world"));
+        CompletableFuture<Object> future = client.request(new World("world"), 
new CompletableFuture());
         Hello result = (Hello) future.get();
         Assertions.assertEquals("hello,world", result.getName());
     }
diff --git 
a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientToServerTest.java
 
b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientToServerTest.java
index 267f569..45ecd37 100644
--- 
a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientToServerTest.java
+++ 
b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientToServerTest.java
@@ -64,7 +64,7 @@ public abstract class ClientToServerTest  {
 
     @Test
     public void testFuture() throws Exception {
-        CompletableFuture<Object> future = client.request(new World("world"));
+        CompletableFuture<Object> future = client.request(new World("world"), 
new CompletableFuture());
         Hello result = (Hello) future.get();
         Assertions.assertEquals("hello,world", result.getName());
     }
diff --git 
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientToServerTest.java
 
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientToServerTest.java
index 9b8db00..a67f92b 100644
--- 
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientToServerTest.java
+++ 
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientToServerTest.java
@@ -65,7 +65,7 @@ public abstract class ClientToServerTest {
 
     @Test
     public void testFuture() throws Exception {
-        CompletableFuture<Object> future = client.request(new World("world"));
+        CompletableFuture<Object> future = client.request(new World("world"), 
new CompletableFuture());
         Hello result = (Hello) future.get();
         Assertions.assertEquals("hello,world", result.getName());
     }
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
index 1018512..6758d04 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
@@ -24,7 +24,6 @@ import org.apache.dubbo.remoting.TimeoutException;
 import org.apache.dubbo.remoting.exchange.ExchangeClient;
 import org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeClient;
 import org.apache.dubbo.remoting.transport.ClientDelegate;
-import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.AsyncRpcResult;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Result;
@@ -34,7 +33,6 @@ import org.apache.dubbo.rpc.protocol.AbstractInvoker;
 import org.apache.dubbo.rpc.support.RpcUtils;
 
 import java.net.InetSocketAddress;
-import java.util.concurrent.CompletableFuture;
 
 import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
@@ -72,15 +70,8 @@ class ChannelWrappedInvoker<T> extends AbstractInvoker<T> {
                 currentClient.send(inv, 
getUrl().getMethodParameter(invocation.getMethodName(), SENT_KEY, false));
                 return AsyncRpcResult.newDefaultAsyncResult(invocation);
             } else {
-                CompletableFuture<Object> responseFuture = 
currentClient.request(inv);
                 AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
-                responseFuture.whenComplete((appResponse, t) -> {
-                    if (t != null) {
-                        asyncRpcResult.completeExceptionally(t);
-                    } else {
-                        asyncRpcResult.complete((AppResponse) appResponse);
-                    }
-                });
+                currentClient.request(inv, asyncRpcResult);
                 return asyncRpcResult;
             }
         } catch (RpcException e) {
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
index 4cf6259..ea7cb23 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
@@ -34,7 +34,6 @@ import org.apache.dubbo.rpc.protocol.AbstractInvoker;
 import org.apache.dubbo.rpc.support.RpcUtils;
 
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -98,8 +97,7 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
                 AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
                 ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                 asyncRpcResult.setExecutor(executor);
-                CompletableFuture<Object> responseFuture = 
currentClient.request(inv, timeout, executor);
-                asyncRpcResult.subscribeTo(responseFuture);
+                currentClient.request(inv, timeout, executor, asyncRpcResult);
                 return asyncRpcResult;
             }
         } catch (TimeoutException e) {
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
index 77eae2c..ffc3dfe 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
@@ -35,8 +35,8 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.dubbo.remoting.Constants.SEND_RECONNECT_KEY;
-import static 
org.apache.dubbo.rpc.protocol.dubbo.Constants.LAZY_CONNECT_INITIAL_STATE_KEY;
 import static 
org.apache.dubbo.rpc.protocol.dubbo.Constants.DEFAULT_LAZY_CONNECT_INITIAL_STATE;
+import static 
org.apache.dubbo.rpc.protocol.dubbo.Constants.LAZY_CONNECT_INITIAL_STATE_KEY;
 
 /**
  * dubbo protocol support class.
@@ -88,10 +88,10 @@ final class LazyConnectExchangeClient implements 
ExchangeClient {
     }
 
     @Override
-    public CompletableFuture<Object> request(Object request) throws 
RemotingException {
+    public CompletableFuture<Object> request(Object request, CompletableFuture 
completableFuture) throws RemotingException {
         warning();
         initClient();
-        return client.request(request);
+        return client.request(request, completableFuture);
     }
 
     @Override
@@ -109,24 +109,24 @@ final class LazyConnectExchangeClient implements 
ExchangeClient {
     }
 
     @Override
-    public CompletableFuture<Object> request(Object request, int timeout) 
throws RemotingException {
+    public CompletableFuture<Object> request(Object request, int timeout, 
CompletableFuture completableFuture) throws RemotingException {
         warning();
         initClient();
-        return client.request(request, timeout);
+        return client.request(request, timeout, completableFuture);
     }
 
     @Override
-    public CompletableFuture<Object> request(Object request, ExecutorService 
executor) throws RemotingException {
+    public CompletableFuture<Object> request(Object request, ExecutorService 
executor, CompletableFuture completableFuture) throws RemotingException {
         warning();
         initClient();
-        return client.request(request, executor);
+        return client.request(request, executor, completableFuture);
     }
 
     @Override
-    public CompletableFuture<Object> request(Object request, int timeout, 
ExecutorService executor) throws RemotingException {
+    public CompletableFuture<Object> request(Object request, int timeout, 
ExecutorService executor, CompletableFuture completableFuture) throws 
RemotingException {
         warning();
         initClient();
-        return client.request(request, timeout, executor);
+        return client.request(request, timeout, executor, completableFuture);
     }
 
     /**
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
index ed3c61b..c25ed2c 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
@@ -57,8 +57,8 @@ final class ReferenceCountExchangeClient implements 
ExchangeClient {
     }
 
     @Override
-    public CompletableFuture<Object> request(Object request) throws 
RemotingException {
-        return client.request(request);
+    public CompletableFuture<Object> request(Object request, CompletableFuture 
completableFuture) throws RemotingException {
+        return client.request(request, completableFuture);
     }
 
     @Override
@@ -77,18 +77,18 @@ final class ReferenceCountExchangeClient implements 
ExchangeClient {
     }
 
     @Override
-    public CompletableFuture<Object> request(Object request, int timeout) 
throws RemotingException {
-        return client.request(request, timeout);
+    public CompletableFuture<Object> request(Object request, int timeout, 
CompletableFuture completableFuture) throws RemotingException {
+        return client.request(request, timeout, completableFuture);
     }
 
     @Override
-    public CompletableFuture<Object> request(Object request, ExecutorService 
executor) throws RemotingException {
-        return client.request(request, executor);
+    public CompletableFuture<Object> request(Object request, ExecutorService 
executor, CompletableFuture completableFuture) throws RemotingException {
+        return client.request(request, executor, completableFuture);
     }
 
     @Override
-    public CompletableFuture<Object> request(Object request, int timeout, 
ExecutorService executor) throws RemotingException {
-        return client.request(request, timeout, executor);
+    public CompletableFuture<Object> request(Object request, int timeout, 
ExecutorService executor, CompletableFuture completableFuture) throws 
RemotingException {
+        return client.request(request, timeout, executor, completableFuture);
     }
 
     @Override
diff --git 
a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
 
b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
index 3d0aee9..0125ddd 100644
--- 
a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
+++ 
b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
@@ -31,7 +31,6 @@ import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.protocol.AbstractInvoker;
 
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.locks.ReentrantLock;
 
 import static 
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
@@ -93,8 +92,7 @@ public class ThriftInvoker<T> extends AbstractInvoker<T> {
             int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, 
DEFAULT_TIMEOUT);
 
             AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
-            CompletableFuture<Object> responseFuture = 
currentClient.request(inv, timeout);
-            asyncRpcResult.subscribeTo(responseFuture);
+            currentClient.request(inv, timeout, asyncRpcResult);
             return asyncRpcResult;
         } catch (TimeoutException e) {
             throw new RpcException(RpcException.TIMEOUT_EXCEPTION, 
e.getMessage(), e);
diff --git 
a/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java
 
b/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java
index 36fbe6c..405550a 100644
--- 
a/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java
@@ -40,6 +40,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayInputStream;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
@@ -133,7 +134,7 @@ public class ThriftCodecTest {
 
         Request request = createRequest();
 
-        DefaultFuture future = DefaultFuture.newFuture(channel, request, 10, 
null);
+        DefaultFuture future = DefaultFuture.newFuture(channel, request, 10, 
null, new CompletableFuture());
 
         TMessage message = new TMessage("echoString", TMessageType.REPLY, 
ThriftCodec.getSeqId());
 
@@ -210,7 +211,7 @@ public class ThriftCodecTest {
 
         Request request = createRequest();
 
-        DefaultFuture future = DefaultFuture.newFuture(channel, request, 10, 
null);
+        DefaultFuture future = DefaultFuture.newFuture(channel, request, 10, 
null, new CompletableFuture());
 
         TMessage message = new TMessage("echoString", TMessageType.EXCEPTION, 
ThriftCodec.getSeqId());
 

Reply via email to