This is an automated email from the ASF dual-hosted git repository.
carryxyh pushed a commit to branch 3.x-dev
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/3.x-dev by this push:
new 58c1474 optimze TimeTask in DefaultFuture (#4132)
58c1474 is described below
commit 58c147410f284711bf2d350d79e2f8f7bd360b5e
Author: ken.lj <[email protected]>
AuthorDate: Thu May 23 15:30:28 2019 +0800
optimze TimeTask in DefaultFuture (#4132)
optimze TimeTask in DefaultFuture
---
.../remoting/exchange/support/DefaultFuture.java | 42 +++++++++++-----------
1 file changed, 21 insertions(+), 21 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
index f7b5239..510e1f6 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
@@ -50,8 +50,6 @@ public class DefaultFuture extends CompletableFuture<Object> {
private static final Map<Long, DefaultFuture> FUTURES = new
ConcurrentHashMap<>();
-// private static final Map<Long, Timeout> PENDING_TASKS = new
ConcurrentHashMap<>();
-
public static final Timer TIME_OUT_TIMER = new HashedWheelTimer(
new NamedThreadFactory("dubbo-future-timeout", true),
30,
@@ -64,6 +62,7 @@ public class DefaultFuture extends CompletableFuture<Object> {
private final int timeout;
private final long start = System.currentTimeMillis();
private volatile long sent;
+ private Timeout timeoutCheckTask;
private ExecutorService executor;
@@ -86,6 +85,14 @@ public class DefaultFuture extends CompletableFuture<Object>
{
}
/**
+ * check time out of the future
+ */
+ private static void timeoutCheck(DefaultFuture future) {
+ TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
+ future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task,
future.getTimeout(), TimeUnit.MILLISECONDS);
+ }
+
+ /**
* init a DefaultFuture
* 1.init a DefaultFuture
* 2.timeout check
@@ -142,15 +149,19 @@ public class DefaultFuture extends
CompletableFuture<Object> {
}
public static void received(Channel channel, Response response) {
+ received(channel, response, false);
+ }
+
+ public static void received(Channel channel, Response response, boolean
timeout) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
+ Timeout t = future.timeoutCheckTask;
+ if (!timeout) {
+ // decrease Time
+ t.cancel();
+ }
future.doReceived(response);
-// Timeout t = PENDING_TASKS.remove(future.getId());
-// if (t != null) {
-// // decrease Time
-// t.cancel();
-// }
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss.SSS").format(new Date()))
@@ -178,6 +189,7 @@ public class DefaultFuture extends
CompletableFuture<Object> {
this.cancel(true);
}
+
private void doReceived(Response res) {
if (res == null) {
throw new IllegalStateException("response cannot be null");
@@ -224,15 +236,6 @@ public class DefaultFuture extends
CompletableFuture<Object> {
sent = System.currentTimeMillis();
}
- /**
- * check time out of the future
- */
- private static void timeoutCheck(DefaultFuture future) {
- TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
- TIME_OUT_TIMER.newTimeout(task, future.getTimeout(),
TimeUnit.MILLISECONDS);
-// PENDING_TASKS.put(future.getId(), t);
- }
-
private String getTimeoutMessage(boolean scan) {
long nowTimestamp = System.currentTimeMillis();
return (sent > 0 ? "Waiting server-side response timeout" : "Sending
request timeout in client-side")
@@ -256,10 +259,7 @@ public class DefaultFuture extends
CompletableFuture<Object> {
@Override
public void run(Timeout timeout) {
- // remove from pending task
-// PENDING_TASKS.remove(future.getId());
-
- DefaultFuture future = FUTURES.remove(requestID);
+ DefaultFuture future = DefaultFuture.getFuture(requestID);
if (future == null || future.isDone()) {
return;
}
@@ -271,7 +271,7 @@ public class DefaultFuture extends
CompletableFuture<Object> {
timeoutResponse.setStatus(future.isSent() ?
Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
// handle response.
- DefaultFuture.received(future.getChannel(),
timeoutResponse);
+ DefaultFuture.received(future.getChannel(),
timeoutResponse, true);
});
}
}