This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 7264850fd4 Fix service context early restore (#12643)
7264850fd4 is described below
commit 7264850fd45f2a4d06b07081d0d8e1eab10fc8db
Author: Albumen Kevin <[email protected]>
AuthorDate: Mon Jul 3 10:26:08 2023 +0800
Fix service context early restore (#12643)
---
.../filter/support/ConsumerContextFilter.java | 91 ++++++++++------------
.../org/apache/dubbo/rpc/proxy/InvocationUtil.java | 87 +++++++++++----------
2 files changed, 90 insertions(+), 88 deletions(-)
diff --git
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
index 582578b5e9..a5a6dcf534 100644
---
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
+++
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
@@ -62,63 +62,58 @@ public class ConsumerContextFilter implements
ClusterFilter, ClusterFilter.Liste
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws
RpcException {
- RpcContext.RestoreServiceContext originServiceContext =
RpcContext.storeServiceContext();
- try {
- RpcContext.getServiceContext()
- .setInvoker(invoker)
- .setInvocation(invocation)
- .setLocalAddress(NetUtils.getLocalHost(), 0);
+ RpcContext.getServiceContext()
+ .setInvoker(invoker)
+ .setInvocation(invocation)
+ .setLocalAddress(NetUtils.getLocalHost(), 0);
- RpcContext context = RpcContext.getClientAttachment();
- context.setAttachment(REMOTE_APPLICATION_KEY,
invoker.getUrl().getApplication());
- if (invocation instanceof RpcInvocation) {
- ((RpcInvocation) invocation).setInvoker(invoker);
- }
+ RpcContext context = RpcContext.getClientAttachment();
+ context.setAttachment(REMOTE_APPLICATION_KEY,
invoker.getUrl().getApplication());
+ if (invocation instanceof RpcInvocation) {
+ ((RpcInvocation) invocation).setInvoker(invoker);
+ }
- if (CollectionUtils.isNotEmpty(supportedSelectors)) {
- for (PenetrateAttachmentSelector supportedSelector :
supportedSelectors) {
- Map<String, Object> selected =
supportedSelector.select(invocation, RpcContext.getClientAttachment(),
RpcContext.getServerAttachment());
- if (CollectionUtils.isNotEmptyMap(selected)) {
- ((RpcInvocation)
invocation).addObjectAttachments(selected);
- }
+ if (CollectionUtils.isNotEmpty(supportedSelectors)) {
+ for (PenetrateAttachmentSelector supportedSelector :
supportedSelectors) {
+ Map<String, Object> selected =
supportedSelector.select(invocation, RpcContext.getClientAttachment(),
RpcContext.getServerAttachment());
+ if (CollectionUtils.isNotEmptyMap(selected)) {
+ ((RpcInvocation)
invocation).addObjectAttachments(selected);
}
- } else {
- ((RpcInvocation)
invocation).addObjectAttachments(RpcContext.getServerAttachment().getObjectAttachments());
- }
- Map<String, Object> contextAttachments =
RpcContext.getClientAttachment().getObjectAttachments();
- if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
- /**
- * invocation.addAttachmentsIfAbsent(context){@link
RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
- * because the {@link RpcContext#setAttachment(String,
String)} is passed in the Filter when the call is triggered
- * by the built-in retry mechanism of the Dubbo. The
attachment to update RpcContext will no longer work, which is
- * a mistake in most cases (for example, through Filter to
RpcContext output traceId and spanId and other information).
- */
- ((RpcInvocation)
invocation).addObjectAttachments(contextAttachments);
}
+ } else {
+ ((RpcInvocation)
invocation).addObjectAttachments(RpcContext.getServerAttachment().getObjectAttachments());
+ }
+ Map<String, Object> contextAttachments =
RpcContext.getClientAttachment().getObjectAttachments();
+ if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
+ /**
+ * invocation.addAttachmentsIfAbsent(context){@link
RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
+ * because the {@link RpcContext#setAttachment(String, String)} is
passed in the Filter when the call is triggered
+ * by the built-in retry mechanism of the Dubbo. The attachment to
update RpcContext will no longer work, which is
+ * a mistake in most cases (for example, through Filter to
RpcContext output traceId and spanId and other information).
+ */
+ ((RpcInvocation)
invocation).addObjectAttachments(contextAttachments);
+ }
- // pass default timeout set by end user (ReferenceConfig)
- Object countDown =
RpcContext.getServerAttachment().getObjectAttachment(TIME_COUNTDOWN_KEY);
- if (countDown != null) {
- String methodName = RpcUtils.getMethodName(invocation);
- // When the client has enabled the timeout-countdown function,
- // the subsequent calls launched by the Server side will be
enabled by default,
- // and support to turn off the function on a node to get rid
of the timeout control.
- if (invoker.getUrl().getMethodParameter(methodName,
ENABLE_TIMEOUT_COUNTDOWN_KEY, true)) {
- context.setObjectAttachment(TIME_COUNTDOWN_KEY, countDown);
+ // pass default timeout set by end user (ReferenceConfig)
+ Object countDown =
RpcContext.getServerAttachment().getObjectAttachment(TIME_COUNTDOWN_KEY);
+ if (countDown != null) {
+ String methodName = RpcUtils.getMethodName(invocation);
+ // When the client has enabled the timeout-countdown function,
+ // the subsequent calls launched by the Server side will be
enabled by default,
+ // and support to turn off the function on a node to get rid of
the timeout control.
+ if (invoker.getUrl().getMethodParameter(methodName,
ENABLE_TIMEOUT_COUNTDOWN_KEY, true)) {
+ context.setObjectAttachment(TIME_COUNTDOWN_KEY, countDown);
- TimeoutCountDown timeoutCountDown = (TimeoutCountDown)
countDown;
- if (timeoutCountDown.isExpired()) {
- return AsyncRpcResult.newDefaultAsyncResult(new
RpcException(RpcException.TIMEOUT_TERMINATE,
- "No time left for making the following call: " +
invocation.getServiceName() + "."
- + RpcUtils.getMethodName(invocation) + ",
terminate directly."), invocation);
- }
+ TimeoutCountDown timeoutCountDown = (TimeoutCountDown)
countDown;
+ if (timeoutCountDown.isExpired()) {
+ return AsyncRpcResult.newDefaultAsyncResult(new
RpcException(RpcException.TIMEOUT_TERMINATE,
+ "No time left for making the following call: " +
invocation.getServiceName() + "."
+ + RpcUtils.getMethodName(invocation) + ",
terminate directly."), invocation);
}
}
- RpcContext.removeClientResponseContext();
- return invoker.invoke(invocation);
- } finally {
- RpcContext.restoreServiceContext(originServiceContext);
}
+ RpcContext.removeClientResponseContext();
+ return invoker.invoke(invocation);
}
@Override
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvocationUtil.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvocationUtil.java
index f7b3a7c89b..cc4c258f6a 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvocationUtil.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvocationUtil.java
@@ -24,6 +24,7 @@ import org.apache.dubbo.common.profiler.Profiler;
import org.apache.dubbo.common.profiler.ProfilerEntry;
import org.apache.dubbo.common.profiler.ProfilerSwitch;
import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcServiceContext;
import org.apache.dubbo.rpc.support.RpcUtils;
@@ -36,53 +37,59 @@ public class InvocationUtil {
private static final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(InvokerInvocationHandler.class);
public static Object invoke(Invoker<?> invoker, RpcInvocation
rpcInvocation) throws Throwable {
- URL url = invoker.getUrl();
- String serviceKey = url.getServiceKey();
- rpcInvocation.setTargetServiceUniqueName(serviceKey);
+ RpcContext.RestoreServiceContext originServiceContext =
RpcContext.storeServiceContext();
- // invoker.getUrl() returns consumer url.
- RpcServiceContext.getServiceContext().setConsumerUrl(url);
+ try {
+ URL url = invoker.getUrl();
+ String serviceKey = url.getServiceKey();
+ rpcInvocation.setTargetServiceUniqueName(serviceKey);
- if (ProfilerSwitch.isEnableSimpleProfiler()) {
- ProfilerEntry parentProfiler = Profiler.getBizProfiler();
- ProfilerEntry bizProfiler;
- if (parentProfiler != null) {
- bizProfiler = Profiler.enter(parentProfiler,
- "Receive request. Client invoke begin. ServiceKey: " +
serviceKey + " MethodName:" + rpcInvocation.getMethodName());
- } else {
- bizProfiler = Profiler.start("Receive request. Client invoke
begin. ServiceKey: " + serviceKey + " " + "MethodName:" +
rpcInvocation.getMethodName());
- }
- rpcInvocation.put(Profiler.PROFILER_KEY, bizProfiler);
- try {
- return invoker.invoke(rpcInvocation).recreate();
- } finally {
- Profiler.release(bizProfiler);
- Long timeout =
RpcUtils.convertToNumber(rpcInvocation.getObjectAttachmentWithoutConvert(TIMEOUT_KEY));
+ // invoker.getUrl() returns consumer url.
+ RpcServiceContext.getServiceContext().setConsumerUrl(url);
- if (timeout == null) {
- timeout = (long)
url.getMethodPositiveParameter(rpcInvocation.getMethodName(),
- TIMEOUT_KEY,
- DEFAULT_TIMEOUT);
+ if (ProfilerSwitch.isEnableSimpleProfiler()) {
+ ProfilerEntry parentProfiler = Profiler.getBizProfiler();
+ ProfilerEntry bizProfiler;
+ if (parentProfiler != null) {
+ bizProfiler = Profiler.enter(parentProfiler,
+ "Receive request. Client invoke begin. ServiceKey: " +
serviceKey + " MethodName:" + rpcInvocation.getMethodName());
+ } else {
+ bizProfiler = Profiler.start("Receive request. Client
invoke begin. ServiceKey: " + serviceKey + " " + "MethodName:" +
rpcInvocation.getMethodName());
}
- long usage = bizProfiler.getEndTime() -
bizProfiler.getStartTime();
- if ((usage / (1000_000L * ProfilerSwitch.getWarnPercent())) >
timeout) {
- StringBuilder attachment = new StringBuilder();
- rpcInvocation.foreachAttachment((entry) -> {
-
attachment.append(entry.getKey()).append("=").append(entry.getValue()).append(";\n");
- });
+ rpcInvocation.put(Profiler.PROFILER_KEY, bizProfiler);
+ try {
+ return invoker.invoke(rpcInvocation).recreate();
+ } finally {
+ Profiler.release(bizProfiler);
+ Long timeout =
RpcUtils.convertToNumber(rpcInvocation.getObjectAttachmentWithoutConvert(TIMEOUT_KEY));
+
+ if (timeout == null) {
+ timeout = (long)
url.getMethodPositiveParameter(rpcInvocation.getMethodName(),
+ TIMEOUT_KEY,
+ DEFAULT_TIMEOUT);
+ }
+ long usage = bizProfiler.getEndTime() -
bizProfiler.getStartTime();
+ if ((usage / (1000_000L *
ProfilerSwitch.getWarnPercent())) > timeout) {
+ StringBuilder attachment = new StringBuilder();
+ rpcInvocation.foreachAttachment((entry) -> {
+
attachment.append(entry.getKey()).append("=").append(entry.getValue()).append(";\n");
+ });
- logger.warn(PROXY_TIMEOUT_REQUEST, "", "", String.format(
- "[Dubbo-Consumer] execute service %s#%s cost %d.%06d
ms, this invocation almost (maybe already) timeout. Timeout: %dms\n" +
"invocation context:\n%s" + "thread info: \n%s",
- rpcInvocation.getProtocolServiceKey(),
- rpcInvocation.getMethodName(),
- usage / 1000_000,
- usage % 1000_000,
- timeout,
- attachment,
- Profiler.buildDetail(bizProfiler)));
+ logger.warn(PROXY_TIMEOUT_REQUEST, "", "",
String.format(
+ "[Dubbo-Consumer] execute service %s#%s cost
%d.%06d ms, this invocation almost (maybe already) timeout. Timeout: %dms\n" +
"invocation context:\n%s" + "thread info: \n%s",
+ rpcInvocation.getProtocolServiceKey(),
+ rpcInvocation.getMethodName(),
+ usage / 1000_000,
+ usage % 1000_000,
+ timeout,
+ attachment,
+ Profiler.buildDetail(bizProfiler)));
+ }
}
}
+ return invoker.invoke(rpcInvocation).recreate();
+ } finally {
+ RpcContext.restoreServiceContext(originServiceContext);
}
- return invoker.invoke(rpcInvocation).recreate();
}
}