This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch performance-tuning-2.7.x
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/performance-tuning-2.7.x by
this push:
new b2dbc5e DefaultFuture (#4257)
b2dbc5e is described below
commit b2dbc5ef5da92e3ee5eba06d64a8e00446dd65c7
Author: ken.lj <[email protected]>
AuthorDate: Wed Jun 5 18:41:40 2019 +0800
DefaultFuture (#4257)
---
.../dubbo/demo/provider/DemoServiceImpl.java | 10 ------
.../apache/dubbo/registry/dubbo/MockChannel.java | 8 ++---
.../apache/dubbo/registry/dubbo/MockedClient.java | 14 ++++-----
.../dubbo/remoting/exchange/ExchangeChannel.java | 8 ++---
.../remoting/exchange/support/DefaultFuture.java | 36 +++++++++++++---------
.../support/header/HeaderExchangeChannel.java | 18 +++++------
.../support/header/HeaderExchangeClient.java | 20 ++++++------
.../test/java/org/apache/dubbo/remoting/Main.java | 10 +++---
.../dubbo/remoting/PerformanceClientFixedTest.java | 5 ++-
.../dubbo/remoting/PerformanceClientTest.java | 9 ++++--
.../exchange/support/DefaultFutureTest.java | 9 ++++--
.../support/header/HeaderExchangeChannelTest.java | 9 +++---
.../transport/mina/ClientToServerTest.java | 2 +-
.../transport/netty/ClientToServerTest.java | 2 +-
.../transport/netty4/ClientToServerTest.java | 2 +-
.../rpc/protocol/dubbo/ChannelWrappedInvoker.java | 11 +------
.../dubbo/rpc/protocol/dubbo/DubboInvoker.java | 4 +--
.../protocol/dubbo/LazyConnectExchangeClient.java | 18 +++++------
.../dubbo/ReferenceCountExchangeClient.java | 16 +++++-----
.../dubbo/rpc/protocol/thrift/ThriftInvoker.java | 4 +--
.../dubbo/rpc/protocol/thrift/ThriftCodecTest.java | 5 +--
21 files changed, 108 insertions(+), 112 deletions(-)
diff --git
a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java
b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java
index 5e57f77..bceeae2 100644
---
a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java
+++
b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java
@@ -30,22 +30,12 @@ public class DemoServiceImpl implements DemoService {
@Override
public String sayHello(String name) {
logger.info("Hello " + name + ", request from consumer: " +
RpcContext.getContext().getRemoteAddress());
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
return "Hello " + name + ", response from provider: " +
RpcContext.getContext().getLocalAddress();
}
@Override
public CompletableFuture<String> sayHelloAsync(String name) {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
return "async result";
});
return cf;
diff --git
a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java
b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java
index e563da6..9c43359 100644
---
a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java
+++
b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java
@@ -86,21 +86,21 @@ public class MockChannel implements ExchangeChannel {
return null;
}
- public CompletableFuture<Object> request(Object request) throws
RemotingException {
+ public CompletableFuture<Object> request(Object request, CompletableFuture
completableFuture) throws RemotingException {
return null;
}
- public CompletableFuture<Object> request(Object request, int timeout)
throws RemotingException {
+ public CompletableFuture<Object> request(Object request, int timeout,
CompletableFuture completableFuture) throws RemotingException {
return null;
}
@Override
- public CompletableFuture<Object> request(Object request, ExecutorService
executor) throws RemotingException {
+ public CompletableFuture<Object> request(Object request, ExecutorService
executor, CompletableFuture completableFuture) throws RemotingException {
return null;
}
@Override
- public CompletableFuture<Object> request(Object request, int timeout,
ExecutorService executor) throws RemotingException {
+ public CompletableFuture<Object> request(Object request, int timeout,
ExecutorService executor, CompletableFuture completableFuture) throws
RemotingException {
return null;
}
diff --git
a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java
b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java
index ad06cd2..bd62d3a 100644
---
a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java
+++
b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java
@@ -81,21 +81,21 @@ public class MockedClient implements ExchangeClient {
this.sent = msg;
}
- public CompletableFuture<Object> request(Object msg) throws
RemotingException {
- return request(msg, null);
+ public CompletableFuture<Object> request(Object msg, CompletableFuture
completableFuture) throws RemotingException {
+ return request(msg, null, completableFuture);
}
- public CompletableFuture<Object> request(Object msg, int timeout) throws
RemotingException {
- return this.request(msg, timeout, null);
+ public CompletableFuture<Object> request(Object msg, int timeout,
CompletableFuture completableFuture) throws RemotingException {
+ return this.request(msg, timeout, null, completableFuture);
}
@Override
- public CompletableFuture<Object> request(Object msg, ExecutorService
executor) throws RemotingException {
- return this.request(msg, 0, executor);
+ public CompletableFuture<Object> request(Object msg, ExecutorService
executor, CompletableFuture completableFuture) throws RemotingException {
+ return this.request(msg, 0, executor, completableFuture);
}
@Override
- public CompletableFuture<Object> request(Object msg, int timeout,
ExecutorService executor) throws RemotingException {
+ public CompletableFuture<Object> request(Object msg, int timeout,
ExecutorService executor, CompletableFuture completableFuture) throws
RemotingException {
this.invoked = msg;
return new CompletableFuture<Object>() {
public Object get() throws InterruptedException,
ExecutionException {
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java
index c0cf131..48a9f6f 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java
@@ -35,7 +35,7 @@ public interface ExchangeChannel extends Channel {
* @throws RemotingException
*/
@Deprecated
- CompletableFuture<Object> request(Object request) throws RemotingException;
+ CompletableFuture<Object> request(Object request, CompletableFuture
completableFuture) throws RemotingException;
/**
* send request.
@@ -46,7 +46,7 @@ public interface ExchangeChannel extends Channel {
* @throws RemotingException
*/
@Deprecated
- CompletableFuture<Object> request(Object request, int timeout) throws
RemotingException;
+ CompletableFuture<Object> request(Object request, int timeout,
CompletableFuture completableFuture) throws RemotingException;
/**
* send request.
@@ -55,7 +55,7 @@ public interface ExchangeChannel extends Channel {
* @return response future
* @throws RemotingException
*/
- CompletableFuture<Object> request(Object request, ExecutorService
executor) throws RemotingException;
+ CompletableFuture<Object> request(Object request, ExecutorService
executor, CompletableFuture completableFuture) throws RemotingException;
/**
* send request.
@@ -65,7 +65,7 @@ public interface ExchangeChannel extends Channel {
* @return response future
* @throws RemotingException
*/
- CompletableFuture<Object> request(Object request, int timeout,
ExecutorService executor) throws RemotingException;
+ CompletableFuture<Object> request(Object request, int timeout,
ExecutorService executor, CompletableFuture completableFuture) throws
RemotingException;
/**
* get message handler.
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
index 0abecf2..a23e8cd 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
@@ -35,6 +35,7 @@ import java.util.Date;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -44,7 +45,7 @@ import static
org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
/**
* DefaultFuture.
*/
-public class DefaultFuture extends CompletableFuture<Object> {
+public class DefaultFuture {
private static final Logger logger =
LoggerFactory.getLogger(DefaultFuture.class);
@@ -52,6 +53,9 @@ public class DefaultFuture extends CompletableFuture<Object> {
private static final Map<Long, DefaultFuture> FUTURES = new
ConcurrentHashMap<>();
+ // private static final Map<Long, Timeout> PENDING_TASKS = new
ConcurrentHashMap<>();
+ private final CompletableFuture completableFuture;
+
public static final Timer TIME_OUT_TIMER = new HashedWheelTimer(
new NamedThreadFactory("dubbo-future-timeout", true),
30,
@@ -66,20 +70,18 @@ public class DefaultFuture extends
CompletableFuture<Object> {
private volatile long sent;
private Timeout timeoutCheckTask;
- private ExecutorService executor;
+ private final ExecutorService executor;
public ExecutorService getExecutor() {
return executor;
}
- public void setExecutor(ExecutorService executor) {
- this.executor = executor;
- }
-
- private DefaultFuture(Channel channel, Request request, int timeout) {
+ private DefaultFuture(Channel channel, Request request, int timeout,
ExecutorService executor, CompletableFuture completableFuture) {
this.channel = channel;
this.request = request;
this.id = request.getId();
+ this.executor = executor;
+ this.completableFuture = completableFuture;
this.timeout = timeout > 0 ? timeout :
channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
@@ -104,9 +106,8 @@ public class DefaultFuture extends
CompletableFuture<Object> {
* @param timeout timeout
* @return a new DefaultFuture
*/
- public static DefaultFuture newFuture(Channel channel, Request request,
int timeout, ExecutorService executor) {
- final DefaultFuture future = new DefaultFuture(channel, request,
timeout);
- future.setExecutor(executor);
+ public static DefaultFuture newFuture(Channel channel, Request request,
int timeout, ExecutorService executor, CompletableFuture completableFuture) {
+ final DefaultFuture future = new DefaultFuture(channel, request,
timeout, executor, completableFuture);
// timeout check
timeoutCheck(future);
return future;
@@ -176,7 +177,6 @@ public class DefaultFuture extends
CompletableFuture<Object> {
}
}
- @Override
public boolean cancel(boolean mayInterruptIfRunning) {
Response errorResult = new Response(id);
errorResult.setStatus(Response.CLIENT_ERROR);
@@ -187,6 +187,14 @@ public class DefaultFuture extends
CompletableFuture<Object> {
return true;
}
+ public boolean isDone() {
+ return this.completableFuture.isDone();
+ }
+
+ public Object get() throws ExecutionException, InterruptedException {
+ return this.completableFuture.get();
+ }
+
public void cancel() {
this.cancel(true);
}
@@ -196,11 +204,11 @@ public class DefaultFuture extends
CompletableFuture<Object> {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
- this.complete(res.getResult());
+ this.completableFuture.complete(res.getResult());
} else if (res.getStatus() == Response.CLIENT_TIMEOUT ||
res.getStatus() == Response.SERVER_TIMEOUT) {
- this.completeExceptionally(new TimeoutException(res.getStatus() ==
Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
+ this.completableFuture.completeExceptionally(new
TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel,
res.getErrorMessage()));
} else {
- this.completeExceptionally(new RemotingException(channel,
res.getErrorMessage()));
+ this.completableFuture.completeExceptionally(new
RemotingException(channel, res.getErrorMessage()));
}
// the result is returning, but the caller thread may still waiting
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
index b954612..a82bf53 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java
@@ -106,22 +106,22 @@ final class HeaderExchangeChannel implements
ExchangeChannel {
}
@Override
- public CompletableFuture<Object> request(Object request) throws
RemotingException {
- return request(request, null);
+ public CompletableFuture<Object> request(Object request, CompletableFuture
completableFuture) throws RemotingException {
+ return request(request, null, completableFuture);
}
@Override
- public CompletableFuture<Object> request(Object request, int timeout)
throws RemotingException {
- return request(request, timeout, null);
+ public CompletableFuture<Object> request(Object request, int timeout,
CompletableFuture completableFuture) throws RemotingException {
+ return request(request, timeout, null, completableFuture);
}
@Override
- public CompletableFuture<Object> request(Object request, ExecutorService
executor) throws RemotingException {
- return request(request,
channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), executor);
+ public CompletableFuture<Object> request(Object request, ExecutorService
executor, CompletableFuture completableFuture) throws RemotingException {
+ return request(request,
channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), executor,
completableFuture);
}
@Override
- public CompletableFuture<Object> request(Object request, int timeout,
ExecutorService executor) throws RemotingException {
+ public CompletableFuture<Object> request(Object request, int timeout,
ExecutorService executor, CompletableFuture completableFuture) throws
RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed
to send request " + request + ", cause: The channel " + this + " is closed!");
}
@@ -130,14 +130,14 @@ final class HeaderExchangeChannel implements
ExchangeChannel {
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
- DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout,
executor);
+ DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout,
executor, completableFuture);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
- return future;
+ return completableFuture;
}
@Override
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
index 6671d1a..8e04b99 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
@@ -34,11 +34,11 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import static org.apache.dubbo.remoting.utils.UrlUtils.getHeartbeat;
-import static org.apache.dubbo.remoting.utils.UrlUtils.getIdleTimeout;
import static org.apache.dubbo.remoting.Constants.HEARTBEAT_CHECK_TICK;
import static org.apache.dubbo.remoting.Constants.LEAST_HEARTBEAT_DURATION;
import static org.apache.dubbo.remoting.Constants.TICKS_PER_WHEEL;
+import static org.apache.dubbo.remoting.utils.UrlUtils.getHeartbeat;
+import static org.apache.dubbo.remoting.utils.UrlUtils.getIdleTimeout;
/**
* DefaultMessageClient
@@ -66,8 +66,8 @@ public class HeaderExchangeClient implements ExchangeClient {
}
@Override
- public CompletableFuture<Object> request(Object request) throws
RemotingException {
- return channel.request(request);
+ public CompletableFuture<Object> request(Object request, CompletableFuture
completableFuture) throws RemotingException {
+ return channel.request(request, completableFuture);
}
@Override
@@ -81,18 +81,18 @@ public class HeaderExchangeClient implements ExchangeClient
{
}
@Override
- public CompletableFuture<Object> request(Object request, int timeout)
throws RemotingException {
- return channel.request(request, timeout);
+ public CompletableFuture<Object> request(Object request, int timeout,
CompletableFuture completableFuture) throws RemotingException {
+ return channel.request(request, timeout, completableFuture);
}
@Override
- public CompletableFuture<Object> request(Object request, ExecutorService
executor) throws RemotingException {
- return channel.request(request, executor);
+ public CompletableFuture<Object> request(Object request, ExecutorService
executor, CompletableFuture completableFuture) throws RemotingException {
+ return channel.request(request, executor, completableFuture);
}
@Override
- public CompletableFuture<Object> request(Object request, int timeout,
ExecutorService executor) throws RemotingException {
- return channel.request(request, timeout, executor);
+ public CompletableFuture<Object> request(Object request, int timeout,
ExecutorService executor, CompletableFuture completableFuture) throws
RemotingException {
+ return channel.request(request, timeout, executor, completableFuture);
}
@Override
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/Main.java
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/Main.java
index 4478f86..9869bcc 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/Main.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/Main.java
@@ -62,7 +62,7 @@ public class Main {
sb.append("(" + random.nextLong() + ")");
Main.Data d = new Main.Data();
d.setData(sb.toString());
- client.request(d).get();
+ client.request(d, new CompletableFuture()).get();
}
System.out.println("send finished.");
}
@@ -83,18 +83,18 @@ public class Main {
private static void test(int port) throws Exception {
ExchangeChannel client =
Exchangers.connect(URL.valueOf("dubbo://localhost:" + port));
- MockResult result = (MockResult) client.request(new
RpcMessage(DemoService.class.getName(), "plus", new Class<?>[]{int.class,
int.class}, new Object[]{55, 25})).get();
+ MockResult result = (MockResult) client.request(new
RpcMessage(DemoService.class.getName(), "plus", new Class<?>[]{int.class,
int.class}, new Object[]{55, 25}), new CompletableFuture()).get();
System.out.println("55+25=" + result.getResult());
for (int i = 0; i < 100; i++)
- client.request(new RpcMessage(DemoService.class.getName(),
"sayHello", new Class<?>[]{String.class}, new Object[]{"qianlei" + i}));
+ client.request(new RpcMessage(DemoService.class.getName(),
"sayHello", new Class<?>[]{String.class}, new Object[]{"qianlei" + i}), new
CompletableFuture());
for (int i = 0; i < 100; i++)
- client.request(new Main.Data());
+ client.request(new Main.Data(), new CompletableFuture());
System.out.println("=====test invoke=====");
for (int i = 0; i < 100; i++) {
- CompletableFuture<Object> future = client.request(new Main.Data());
+ CompletableFuture<Object> future = client.request(new Main.Data(),
new CompletableFuture());
System.out.println("invoke and get");
System.out.println("invoke result:" + future.get());
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientFixedTest.java
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientFixedTest.java
index 93ccc76..ab0228f 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientFixedTest.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientFixedTest.java
@@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Random;
+import java.util.concurrent.CompletableFuture;
import static
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
@@ -120,7 +121,9 @@ public class PerformanceClientFixedTest {
int index = rd.nextInt(connectionCount);
ExchangeClient client = arrays.get(index);
// ExchangeClient client = arrays.get(0);
- String output = (String) client.request(messageBlock).get();
+ CompletableFuture cf = new CompletableFuture();
+ client.request(messageBlock, cf);
+ String output = (String) cf.get();
if (output.lastIndexOf(messageBlock) < 0) {
System.out.println("send messageBlock;get " + output);
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientTest.java
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientTest.java
index f4dfbc2..b86b17d 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientTest.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientTest.java
@@ -28,6 +28,7 @@ import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -72,8 +73,9 @@ public class PerformanceClientTest {
exchangeClients[i] = Exchangers.connect(url);
}
- List<String> serverEnvironment = (List<String>)
exchangeClients[0].request("environment").get();
- List<String> serverScene = (List<String>)
exchangeClients[0].request("scene").get();
+ CompletableFuture<Object> completableFuture = new
CompletableFuture<>();
+ List<String> serverEnvironment = (List<String>)
exchangeClients[0].request("environment", completableFuture).get();
+ List<String> serverScene = (List<String>)
exchangeClients[0].request("scene", completableFuture).get();
// Create some data for test
StringBuilder buf = new StringBuilder(length);
@@ -101,7 +103,8 @@ public class PerformanceClientTest {
count.incrementAndGet();
ExchangeClient client =
exchangeClients[index.getAndIncrement() % connections];
long start = System.currentTimeMillis();
- String result = (String)
client.request(data).get();
+ CompletableFuture<Object> completableFuture =
new CompletableFuture<>();
+ String result = (String) client.request(data,
completableFuture).get();
long end = System.currentTimeMillis();
if (!data.equals(result)) {
throw new IllegalStateException("Invalid
result " + result);
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
index 0f19d15..01db770 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java
@@ -27,6 +27,7 @@ import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
public class DefaultFutureTest {
@@ -95,7 +96,8 @@ public class DefaultFutureTest {
// timeout after 5 seconds.
Channel channel = new MockedChannel();
Request request = new Request(10);
- DefaultFuture f = DefaultFuture.newFuture(channel, request, 5000,
null);
+ CompletableFuture cf = new CompletableFuture();
+ DefaultFuture f = DefaultFuture.newFuture(channel, request, 5000,
null, cf);
//mark the future is sent
DefaultFuture.sent(channel, request);
while (!f.isDone()) {
@@ -106,7 +108,7 @@ public class DefaultFutureTest {
// get operate will throw a timeout exception, because the future is
timeout.
try {
- f.get();
+ cf.get();
} catch (Exception e) {
Assertions.assertTrue(e.getCause() instanceof TimeoutException,
"catch exception is not timeout exception!");
System.out.println(e.getMessage());
@@ -119,7 +121,8 @@ public class DefaultFutureTest {
private DefaultFuture defaultFuture(int timeout) {
Channel channel = new MockedChannel();
Request request = new Request(index.getAndIncrement());
- return DefaultFuture.newFuture(channel, request, timeout, null);
+ CompletableFuture<Object> completableFuture = new
CompletableFuture<>();
+ return DefaultFuture.newFuture(channel, request, timeout, null,
completableFuture);
}
}
\ No newline at end of file
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java
index 2affe7d..873540d 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java
@@ -29,6 +29,7 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -143,7 +144,7 @@ public class HeaderExchangeChannelTest {
Assertions.assertThrows(RemotingException.class, () -> {
header.close(1000);
Object requestob = new Object();
- header.request(requestob);
+ header.request(requestob, CompletableFuture.completedFuture(0));
});
}
@@ -153,7 +154,7 @@ public class HeaderExchangeChannelTest {
header = new HeaderExchangeChannel(channel);
when(channel.getUrl()).thenReturn(url);
Object requestob = new Object();
- header.request(requestob);
+ header.request(requestob, CompletableFuture.completedFuture(0));
ArgumentCaptor<Request> argumentCaptor =
ArgumentCaptor.forClass(Request.class);
verify(channel, times(1)).send(argumentCaptor.capture());
Assertions.assertEquals(argumentCaptor.getValue().getData(),
requestob);
@@ -170,7 +171,7 @@ public class HeaderExchangeChannelTest {
};
header = new HeaderExchangeChannel(channel);
Object requestob = new Object();
- header.request(requestob, 1000);
+ header.request(requestob, 1000,
CompletableFuture.completedFuture(0));
});
}
@@ -191,7 +192,7 @@ public class HeaderExchangeChannelTest {
public void closeWithTimeoutTest02() {
Assertions.assertFalse(channel.isClosed());
Request request = new Request();
- DefaultFuture.newFuture(channel, request, 100, null);
+ DefaultFuture.newFuture(channel, request, 100, null,
CompletableFuture.completedFuture(0));
header.close(100);
//return directly
header.close(1000);
diff --git
a/dubbo-remoting/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientToServerTest.java
b/dubbo-remoting/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientToServerTest.java
index 6414434..bea1860 100644
---
a/dubbo-remoting/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientToServerTest.java
+++
b/dubbo-remoting/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientToServerTest.java
@@ -65,7 +65,7 @@ public abstract class ClientToServerTest {
@Test
public void testFuture() throws Exception {
- CompletableFuture<Object> future = client.request(new World("world"));
+ CompletableFuture<Object> future = client.request(new World("world"),
new CompletableFuture());
Hello result = (Hello) future.get();
Assertions.assertEquals("hello,world", result.getName());
}
diff --git
a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientToServerTest.java
b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientToServerTest.java
index 267f569..45ecd37 100644
---
a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientToServerTest.java
+++
b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientToServerTest.java
@@ -64,7 +64,7 @@ public abstract class ClientToServerTest {
@Test
public void testFuture() throws Exception {
- CompletableFuture<Object> future = client.request(new World("world"));
+ CompletableFuture<Object> future = client.request(new World("world"),
new CompletableFuture());
Hello result = (Hello) future.get();
Assertions.assertEquals("hello,world", result.getName());
}
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientToServerTest.java
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientToServerTest.java
index 9b8db00..a67f92b 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientToServerTest.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientToServerTest.java
@@ -65,7 +65,7 @@ public abstract class ClientToServerTest {
@Test
public void testFuture() throws Exception {
- CompletableFuture<Object> future = client.request(new World("world"));
+ CompletableFuture<Object> future = client.request(new World("world"),
new CompletableFuture());
Hello result = (Hello) future.get();
Assertions.assertEquals("hello,world", result.getName());
}
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
index 1018512..6758d04 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
@@ -24,7 +24,6 @@ import org.apache.dubbo.remoting.TimeoutException;
import org.apache.dubbo.remoting.exchange.ExchangeClient;
import org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeClient;
import org.apache.dubbo.remoting.transport.ClientDelegate;
-import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Result;
@@ -34,7 +33,6 @@ import org.apache.dubbo.rpc.protocol.AbstractInvoker;
import org.apache.dubbo.rpc.support.RpcUtils;
import java.net.InetSocketAddress;
-import java.util.concurrent.CompletableFuture;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
@@ -72,15 +70,8 @@ class ChannelWrappedInvoker<T> extends AbstractInvoker<T> {
currentClient.send(inv,
getUrl().getMethodParameter(invocation.getMethodName(), SENT_KEY, false));
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
- CompletableFuture<Object> responseFuture =
currentClient.request(inv);
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
- responseFuture.whenComplete((appResponse, t) -> {
- if (t != null) {
- asyncRpcResult.completeExceptionally(t);
- } else {
- asyncRpcResult.complete((AppResponse) appResponse);
- }
- });
+ currentClient.request(inv, asyncRpcResult);
return asyncRpcResult;
}
} catch (RpcException e) {
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
index 4cf6259..ea7cb23 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
@@ -34,7 +34,6 @@ import org.apache.dubbo.rpc.protocol.AbstractInvoker;
import org.apache.dubbo.rpc.support.RpcUtils;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.ReentrantLock;
@@ -98,8 +97,7 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
asyncRpcResult.setExecutor(executor);
- CompletableFuture<Object> responseFuture =
currentClient.request(inv, timeout, executor);
- asyncRpcResult.subscribeTo(responseFuture);
+ currentClient.request(inv, timeout, executor, asyncRpcResult);
return asyncRpcResult;
}
} catch (TimeoutException e) {
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
index 77eae2c..ffc3dfe 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
@@ -35,8 +35,8 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.dubbo.remoting.Constants.SEND_RECONNECT_KEY;
-import static
org.apache.dubbo.rpc.protocol.dubbo.Constants.LAZY_CONNECT_INITIAL_STATE_KEY;
import static
org.apache.dubbo.rpc.protocol.dubbo.Constants.DEFAULT_LAZY_CONNECT_INITIAL_STATE;
+import static
org.apache.dubbo.rpc.protocol.dubbo.Constants.LAZY_CONNECT_INITIAL_STATE_KEY;
/**
* dubbo protocol support class.
@@ -88,10 +88,10 @@ final class LazyConnectExchangeClient implements
ExchangeClient {
}
@Override
- public CompletableFuture<Object> request(Object request) throws
RemotingException {
+ public CompletableFuture<Object> request(Object request, CompletableFuture
completableFuture) throws RemotingException {
warning();
initClient();
- return client.request(request);
+ return client.request(request, completableFuture);
}
@Override
@@ -109,24 +109,24 @@ final class LazyConnectExchangeClient implements
ExchangeClient {
}
@Override
- public CompletableFuture<Object> request(Object request, int timeout)
throws RemotingException {
+ public CompletableFuture<Object> request(Object request, int timeout,
CompletableFuture completableFuture) throws RemotingException {
warning();
initClient();
- return client.request(request, timeout);
+ return client.request(request, timeout, completableFuture);
}
@Override
- public CompletableFuture<Object> request(Object request, ExecutorService
executor) throws RemotingException {
+ public CompletableFuture<Object> request(Object request, ExecutorService
executor, CompletableFuture completableFuture) throws RemotingException {
warning();
initClient();
- return client.request(request, executor);
+ return client.request(request, executor, completableFuture);
}
@Override
- public CompletableFuture<Object> request(Object request, int timeout,
ExecutorService executor) throws RemotingException {
+ public CompletableFuture<Object> request(Object request, int timeout,
ExecutorService executor, CompletableFuture completableFuture) throws
RemotingException {
warning();
initClient();
- return client.request(request, timeout, executor);
+ return client.request(request, timeout, executor, completableFuture);
}
/**
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
index ed3c61b..c25ed2c 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
@@ -57,8 +57,8 @@ final class ReferenceCountExchangeClient implements
ExchangeClient {
}
@Override
- public CompletableFuture<Object> request(Object request) throws
RemotingException {
- return client.request(request);
+ public CompletableFuture<Object> request(Object request, CompletableFuture
completableFuture) throws RemotingException {
+ return client.request(request, completableFuture);
}
@Override
@@ -77,18 +77,18 @@ final class ReferenceCountExchangeClient implements
ExchangeClient {
}
@Override
- public CompletableFuture<Object> request(Object request, int timeout)
throws RemotingException {
- return client.request(request, timeout);
+ public CompletableFuture<Object> request(Object request, int timeout,
CompletableFuture completableFuture) throws RemotingException {
+ return client.request(request, timeout, completableFuture);
}
@Override
- public CompletableFuture<Object> request(Object request, ExecutorService
executor) throws RemotingException {
- return client.request(request, executor);
+ public CompletableFuture<Object> request(Object request, ExecutorService
executor, CompletableFuture completableFuture) throws RemotingException {
+ return client.request(request, executor, completableFuture);
}
@Override
- public CompletableFuture<Object> request(Object request, int timeout,
ExecutorService executor) throws RemotingException {
- return client.request(request, timeout, executor);
+ public CompletableFuture<Object> request(Object request, int timeout,
ExecutorService executor, CompletableFuture completableFuture) throws
RemotingException {
+ return client.request(request, timeout, executor, completableFuture);
}
@Override
diff --git
a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
index 3d0aee9..0125ddd 100644
---
a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java
@@ -31,7 +31,6 @@ import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;
import static
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
@@ -93,8 +92,7 @@ public class ThriftInvoker<T> extends AbstractInvoker<T> {
int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY,
DEFAULT_TIMEOUT);
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
- CompletableFuture<Object> responseFuture =
currentClient.request(inv, timeout);
- asyncRpcResult.subscribeTo(responseFuture);
+ currentClient.request(inv, timeout, asyncRpcResult);
return asyncRpcResult;
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION,
e.getMessage(), e);
diff --git
a/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java
b/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java
index 36fbe6c..405550a 100644
---
a/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java
+++
b/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java
@@ -40,6 +40,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
+import java.util.concurrent.CompletableFuture;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
@@ -133,7 +134,7 @@ public class ThriftCodecTest {
Request request = createRequest();
- DefaultFuture future = DefaultFuture.newFuture(channel, request, 10,
null);
+ DefaultFuture future = DefaultFuture.newFuture(channel, request, 10,
null, new CompletableFuture());
TMessage message = new TMessage("echoString", TMessageType.REPLY,
ThriftCodec.getSeqId());
@@ -210,7 +211,7 @@ public class ThriftCodecTest {
Request request = createRequest();
- DefaultFuture future = DefaultFuture.newFuture(channel, request, 10,
null);
+ DefaultFuture future = DefaultFuture.newFuture(channel, request, 10,
null, new CompletableFuture());
TMessage message = new TMessage("echoString", TMessageType.EXCEPTION,
ThriftCodec.getSeqId());