This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.1 by this push:
new 7a5680b559 Fix countdown assignment problem (#11275)
7a5680b559 is described below
commit 7a5680b559e73b461bda4b1b07586c0e06357391
Author: huazhongming <[email protected]>
AuthorDate: Thu Jan 12 17:32:55 2023 +0800
Fix countdown assignment problem (#11275)
---
.../filter/support/ConsumerContextFilter.java | 3 +-
.../org/apache/dubbo/rpc/support/RpcUtils.java | 26 ++++++++++++++++++
.../dubbo/rpc/protocol/dubbo/DubboInvoker.java | 26 +-----------------
.../dubbo/rpc/protocol/injvm/InjvmInvoker.java | 26 +-----------------
.../dubbo/rpc/protocol/tri/TripleInvoker.java | 32 ++--------------------
5 files changed, 31 insertions(+), 82 deletions(-)
diff --git
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
index e7069ae139..39bf784221 100644
---
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
+++
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
@@ -41,7 +41,6 @@ import java.util.Set;
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
import static
org.apache.dubbo.common.constants.CommonConstants.ENABLE_TIMEOUT_COUNTDOWN_KEY;
import static
org.apache.dubbo.common.constants.CommonConstants.REMOTE_APPLICATION_KEY;
-import static
org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_ATTACHMENT_KEY;
import static
org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
/**
@@ -106,7 +105,7 @@ public class ConsumerContextFilter implements
ClusterFilter, ClusterFilter.Liste
// the subsequent calls launched by the Server side will be
enabled by default,
// and support to turn off the function on a node to get rid
of the timeout control.
if (invoker.getUrl().getMethodParameter(methodName,
ENABLE_TIMEOUT_COUNTDOWN_KEY, true)) {
- context.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY,
countDown);
+ context.setObjectAttachment(TIME_COUNTDOWN_KEY, countDown);
TimeoutCountDown timeoutCountDown = (TimeoutCountDown)
countDown;
if (timeoutCountDown.isExpired()) {
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/support/RpcUtils.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/support/RpcUtils.java
index c37083e682..9e6d70e00a 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/support/RpcUtils.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/support/RpcUtils.java
@@ -25,19 +25,23 @@ import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.InvokeMode;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.TimeoutCountDown;
import org.apache.dubbo.rpc.service.GenericService;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.dubbo.common.constants.CommonConstants.$INVOKE;
import static org.apache.dubbo.common.constants.CommonConstants.$INVOKE_ASYNC;
+import static
org.apache.dubbo.common.constants.CommonConstants.ENABLE_TIMEOUT_COUNTDOWN_KEY;
import static
org.apache.dubbo.common.constants.CommonConstants.GENERIC_PARAMETER_DESC;
import static
org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_ATTACHMENT_KEY;
import static
org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_ATTACHMENT_KEY_LOWER;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+import static
org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.COMMON_REFLECTIVE_OPERATION_FAILED;
import static org.apache.dubbo.rpc.Constants.$ECHO;
import static org.apache.dubbo.rpc.Constants.$ECHO_PARAMETER_DESC;
@@ -276,6 +280,28 @@ public class RpcUtils {
return timeout;
}
+ public static int calculateTimeout(URL url, Invocation invocation, String
methodName, long defaultTimeout) {
+ Object countdown =
RpcContext.getClientAttachment().getObjectAttachment(TIME_COUNTDOWN_KEY);
+ int timeout = (int) defaultTimeout;
+ if (countdown == null) {
+ if (url != null) {
+ timeout = (int) RpcUtils.getTimeout(url, methodName,
RpcContext.getClientAttachment(), invocation, defaultTimeout);
+ if (url.getMethodParameter(methodName,
ENABLE_TIMEOUT_COUNTDOWN_KEY, false)) {
+ // pass timeout to remote server
+ invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY,
timeout);
+ }
+ }
+ } else {
+ TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countdown;
+ timeout = (int)
timeoutCountDown.timeRemaining(TimeUnit.MILLISECONDS);
+ // pass timeout to remote server
+ invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout);
+ }
+
+ invocation.getObjectAttachments().remove(TIME_COUNTDOWN_KEY);
+ return timeout;
+ }
+
private static long convertToNumber(Object obj, long defaultTimeout) {
long timeout = defaultTimeout;
try {
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
index 9be288b14b..4bdc607b9e 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
@@ -29,28 +29,22 @@ import org.apache.dubbo.rpc.FutureContext;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
-import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.TimeoutCountDown;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;
import org.apache.dubbo.rpc.support.RpcUtils;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import static
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
import static
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_VERSION;
-import static
org.apache.dubbo.common.constants.CommonConstants.ENABLE_TIMEOUT_COUNTDOWN_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
-import static
org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_ATTACHMENT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
-import static
org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_ERROR_CLOSE_CLIENT;
import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
@@ -101,7 +95,7 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
- int timeout = calculateTimeout(invocation, methodName);
+ int timeout = RpcUtils.calculateTimeout(getUrl(), invocation,
methodName, DEFAULT_TIMEOUT);
if (timeout <= 0) {
return AsyncRpcResult.newDefaultAsyncResult(new
RpcException(RpcException.TIMEOUT_TERMINATE,
"No time left for making the following call: " +
invocation.getServiceName() + "."
@@ -173,22 +167,4 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
}
}
}
-
- private int calculateTimeout(Invocation invocation, String methodName) {
- Object countdown =
RpcContext.getClientAttachment().getObjectAttachment(TIME_COUNTDOWN_KEY);
- int timeout;
- if (countdown == null) {
- timeout = (int) RpcUtils.getTimeout(getUrl(), methodName,
RpcContext.getClientAttachment(), invocation, DEFAULT_TIMEOUT);
- if (getUrl().getMethodParameter(methodName,
ENABLE_TIMEOUT_COUNTDOWN_KEY, false)) {
- invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY,
timeout); // pass timeout to remote server
- }
- } else {
- TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countdown;
- timeout = (int)
timeoutCountDown.timeRemaining(TimeUnit.MILLISECONDS);
- invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout);//
pass timeout to remote server
- }
-
- invocation.getObjectAttachments().remove(TIME_COUNTDOWN_KEY);
- return timeout;
- }
}
diff --git
a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java
b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java
index 3fb72d0a54..04189a0e61 100644
---
a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java
@@ -34,7 +34,6 @@ import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
-import org.apache.dubbo.rpc.TimeoutCountDown;
import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.ServiceModel;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;
@@ -46,14 +45,10 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
import static
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
-import static
org.apache.dubbo.common.constants.CommonConstants.ENABLE_TIMEOUT_COUNTDOWN_KEY;
import static
org.apache.dubbo.common.constants.CommonConstants.LOCALHOST_VALUE;
-import static
org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_ATTACHMENT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
-import static
org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
import static org.apache.dubbo.rpc.Constants.ASYNC_KEY;
/**
@@ -106,7 +101,7 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
invocation.setAttachment(Constants.TOKEN_KEY,
serverURL.getParameter(Constants.TOKEN_KEY));
}
- int timeout = calculateTimeout(invocation, invocation.getMethodName());
+ int timeout = RpcUtils.calculateTimeout(getUrl(), invocation,
invocation.getMethodName(), DEFAULT_TIMEOUT);
if (timeout <= 0) {
return AsyncRpcResult.newDefaultAsyncResult(new
RpcException(RpcException.TIMEOUT_TERMINATE,
"No time left for making the following call: " +
invocation.getServiceName() + "."
@@ -278,23 +273,4 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
}
return remoteUrl.getParameter(ASYNC_KEY, false);
}
-
- private int calculateTimeout(Invocation invocation, String methodName) {
- Object countdown =
RpcContext.getClientAttachment().getObjectAttachment(TIME_COUNTDOWN_KEY);
- int timeout;
- if (countdown == null) {
- timeout = (int) RpcUtils.getTimeout(getUrl(), methodName,
RpcContext.getClientAttachment(), invocation, DEFAULT_TIMEOUT);
- if (getUrl().getMethodParameter(methodName,
ENABLE_TIMEOUT_COUNTDOWN_KEY, false)) {
- invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY,
timeout); // pass timeout to remote server
- }
- } else {
- TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countdown;
- timeout = (int)
timeoutCountDown.timeRemaining(TimeUnit.MILLISECONDS);
- invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout);//
pass timeout to remote server
- }
-
- invocation.getObjectAttachments().remove(TIME_COUNTDOWN_KEY);
- return timeout;
- }
-
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index fbd89717b1..eb37464149 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.rpc.protocol.tri;
+import io.netty.util.AsciiString;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
@@ -33,7 +34,6 @@ import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
-import org.apache.dubbo.rpc.TimeoutCountDown;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.model.ConsumerModel;
import org.apache.dubbo.rpc.model.MethodDescriptor;
@@ -49,22 +49,16 @@ import
org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
import org.apache.dubbo.rpc.support.RpcUtils;
-import io.netty.util.AsciiString;
-
import java.util.Arrays;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
-import static
org.apache.dubbo.common.constants.CommonConstants.ENABLE_TIMEOUT_COUNTDOWN_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
-import static
org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_ATTACHMENT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
-import static
org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_DESTROY_INVOKER;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_REQUEST;
import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
@@ -195,7 +189,7 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
ClientCall call) {
ExecutorService callbackExecutor = getCallbackExecutor(getUrl(),
invocation);
- int timeout = calculateTimeout(invocation, invocation.getMethodName());
+ int timeout = RpcUtils.calculateTimeout(getUrl(), invocation,
invocation.getMethodName(), 3000);
if (timeout <= 0) {
return AsyncRpcResult.newDefaultAsyncResult(new
RpcException(RpcException.TIMEOUT_TERMINATE,
"No time left for making the following call: " +
invocation.getServiceName() + "."
@@ -299,26 +293,4 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
}
}
}
-
- private int calculateTimeout(Invocation invocation, String methodName) {
- Object countdown =
RpcContext.getClientAttachment().getObjectAttachment(TIME_COUNTDOWN_KEY);
- int timeout;
- if (countdown == null) {
- timeout = (int) RpcUtils.getTimeout(getUrl(), methodName,
- RpcContext.getClientAttachment(), invocation, 3000);
- if (getUrl().getMethodParameter(methodName,
ENABLE_TIMEOUT_COUNTDOWN_KEY, false)) {
- invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY,
- timeout); // pass timeout to remote server
- }
- } else {
- TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countdown;
- timeout = (int)
timeoutCountDown.timeRemaining(TimeUnit.MILLISECONDS);
- invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY,
- timeout);// pass timeout to remote server
- }
-
- invocation.getObjectAttachments().remove(TIME_COUNTDOWN_KEY);
- return timeout;
- }
-
}