This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new c9f3964 add force check (#8064)
c9f3964 is described below
commit c9f39646c3cd6a7a427775e28047bd8c42a76cc6
Author: Albumen Kevin <[email protected]>
AuthorDate: Wed Jun 16 16:06:52 2021 +0800
add force check (#8064)
---
.../rpc/protocol/tri/AbstractClientStream.java | 34 ++++++++++++----------
.../rpc/protocol/tri/AbstractServerStream.java | 10 ++++++-
2 files changed, 28 insertions(+), 16 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
index ab7c155..967e14a 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
@@ -77,13 +77,13 @@ public abstract class AbstractClientStream extends
AbstractStream implements Str
} catch (RejectedExecutionException e) {
LOGGER.error("Consumer's thread pool is full", e);
getStreamSubscriber().onError(GrpcStatus.fromCode(GrpcStatus.Code.RESOURCE_EXHAUSTED)
- .withDescription("Consumer's thread pool is
full").asException());
+ .withDescription("Consumer's thread pool is
full").asException());
} catch (Throwable t) {
LOGGER.error("Consumer submit request to thread pool error ", t);
getStreamSubscriber().onError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withCause(t)
- .withDescription("Consumer's error")
- .asException());
+ .withCause(t)
+ .withDescription("Consumer's error")
+ .asException());
}
}
@@ -128,8 +128,12 @@ public abstract class AbstractClientStream extends
AbstractStream implements Str
}
if (getMethodDescriptor().isNeedWrap()) {
final TripleWrapper.TripleResponseWrapper wrapper =
TripleUtil.unpack(data,
- TripleWrapper.TripleResponseWrapper.class);
- serialize(wrapper.getSerializeType());
+ TripleWrapper.TripleResponseWrapper.class);
+ if
(!getSerializeType().equals(TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType())))
{
+ throw new UnsupportedOperationException("Received
inconsistent serialization type from server, " +
+ "reject to deserialize! Expected:" +
getSerializeType() +
+ " Actual:" +
TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType()));
+ }
return TripleUtil.unwrapResp(getUrl(), wrapper,
getMultipleSerialization());
} else {
return TripleUtil.unpack(data,
getMethodDescriptor().getReturnClass());
@@ -142,17 +146,17 @@ public abstract class AbstractClientStream extends
AbstractStream implements Str
protected Metadata createRequestMeta(RpcInvocation inv) {
Metadata metadata = new DefaultMetadata();
metadata.put(TripleConstant.PATH_KEY, "/" +
inv.getObjectAttachment(CommonConstants.PATH_KEY) + "/" + inv.getMethodName())
- .put(TripleConstant.AUTHORITY_KEY, getUrl().getAddress())
- .put(TripleConstant.CONTENT_TYPE_KEY,
TripleConstant.CONTENT_PROTO)
- .put(TripleConstant.TIMEOUT,
inv.get(CommonConstants.TIMEOUT_KEY) + "m")
- .put(HttpHeaderNames.TE, HttpHeaderValues.TRAILERS);
+ .put(TripleConstant.AUTHORITY_KEY, getUrl().getAddress())
+ .put(TripleConstant.CONTENT_TYPE_KEY, TripleConstant.CONTENT_PROTO)
+ .put(TripleConstant.TIMEOUT, inv.get(CommonConstants.TIMEOUT_KEY)
+ "m")
+ .put(HttpHeaderNames.TE, HttpHeaderValues.TRAILERS);
metadata.putIfNotNull(TripleConstant.SERVICE_VERSION,
inv.getInvoker().getUrl().getVersion())
- .putIfNotNull(TripleConstant.CONSUMER_APP_NAME_KEY,
- (String)
inv.getObjectAttachments().remove(CommonConstants.APPLICATION_KEY))
- .putIfNotNull(TripleConstant.CONSUMER_APP_NAME_KEY,
- (String)
inv.getObjectAttachments().remove(CommonConstants.REMOTE_APPLICATION_KEY))
- .putIfNotNull(TripleConstant.SERVICE_GROUP,
inv.getInvoker().getUrl().getGroup());
+ .putIfNotNull(TripleConstant.CONSUMER_APP_NAME_KEY,
+ (String)
inv.getObjectAttachments().remove(CommonConstants.APPLICATION_KEY))
+ .putIfNotNull(TripleConstant.CONSUMER_APP_NAME_KEY,
+ (String)
inv.getObjectAttachments().remove(CommonConstants.REMOTE_APPLICATION_KEY))
+ .putIfNotNull(TripleConstant.SERVICE_GROUP,
inv.getInvoker().getUrl().getGroup());
inv.getObjectAttachments().remove(CommonConstants.GROUP_KEY);
inv.getObjectAttachments().remove(CommonConstants.INTERFACE_KEY);
inv.getObjectAttachments().remove(CommonConstants.PATH_KEY);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index 41dfabf..974d088 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ApplicationModel;
@@ -58,6 +59,7 @@ public abstract class AbstractServerStream extends
AbstractStream implements Str
protected AbstractServerStream(URL url, Executor executor, ProviderModel
providerModel) {
super(url, executor);
this.providerModel = providerModel;
+ this.serialize(getUrl().getParameter(Constants.SERIALIZATION_KEY,
Constants.DEFAULT_REMOTING_SERIALIZATION));
}
private static Executor lookupExecutor(URL url, ProviderModel
providerModel) {
@@ -142,7 +144,13 @@ public abstract class AbstractServerStream extends
AbstractStream implements Str
if (getMethodDescriptor() == null ||
getMethodDescriptor().isNeedWrap()) {
final TripleWrapper.TripleRequestWrapper wrapper =
TripleUtil.unpack(data,
TripleWrapper.TripleRequestWrapper.class);
- serialize(wrapper.getSerializeType());
+ if
(!getSerializeType().equals(TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType())))
{
+
transportError(GrpcStatus.fromCode(GrpcStatus.Code.INVALID_ARGUMENT)
+ .withDescription("Received inconsistent serialization
type from client, " +
+ "reject to deserialize! Expected:" +
getSerializeType() +
+ " Actual:" +
TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType())));
+ return null;
+ }
if (getMethodDescriptor() == null) {
final String[] paramTypes =
wrapper.getArgTypesList().toArray(new String[wrapper.getArgsCount()]);