This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new c9f3964  add force check (#8064)
c9f3964 is described below

commit c9f39646c3cd6a7a427775e28047bd8c42a76cc6
Author: Albumen Kevin <[email protected]>
AuthorDate: Wed Jun 16 16:06:52 2021 +0800

    add force check (#8064)
---
 .../rpc/protocol/tri/AbstractClientStream.java     | 34 ++++++++++++----------
 .../rpc/protocol/tri/AbstractServerStream.java     | 10 ++++++-
 2 files changed, 28 insertions(+), 16 deletions(-)

diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
index ab7c155..967e14a 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
@@ -77,13 +77,13 @@ public abstract class AbstractClientStream extends 
AbstractStream implements Str
         } catch (RejectedExecutionException e) {
             LOGGER.error("Consumer's thread pool is full", e);
             
getStreamSubscriber().onError(GrpcStatus.fromCode(GrpcStatus.Code.RESOURCE_EXHAUSTED)
-                    .withDescription("Consumer's thread pool is 
full").asException());
+                .withDescription("Consumer's thread pool is 
full").asException());
         } catch (Throwable t) {
             LOGGER.error("Consumer submit request to thread pool error ", t);
             
getStreamSubscriber().onError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                    .withCause(t)
-                    .withDescription("Consumer's error")
-                    .asException());
+                .withCause(t)
+                .withDescription("Consumer's error")
+                .asException());
         }
     }
 
@@ -128,8 +128,12 @@ public abstract class AbstractClientStream extends 
AbstractStream implements Str
             }
             if (getMethodDescriptor().isNeedWrap()) {
                 final TripleWrapper.TripleResponseWrapper wrapper = 
TripleUtil.unpack(data,
-                        TripleWrapper.TripleResponseWrapper.class);
-                serialize(wrapper.getSerializeType());
+                    TripleWrapper.TripleResponseWrapper.class);
+                if 
(!getSerializeType().equals(TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType())))
 {
+                    throw new UnsupportedOperationException("Received 
inconsistent serialization type from server, " +
+                        "reject to deserialize! Expected:" + 
getSerializeType() +
+                        " Actual:" + 
TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType()));
+                }
                 return TripleUtil.unwrapResp(getUrl(), wrapper, 
getMultipleSerialization());
             } else {
                 return TripleUtil.unpack(data, 
getMethodDescriptor().getReturnClass());
@@ -142,17 +146,17 @@ public abstract class AbstractClientStream extends 
AbstractStream implements Str
     protected Metadata createRequestMeta(RpcInvocation inv) {
         Metadata metadata = new DefaultMetadata();
         metadata.put(TripleConstant.PATH_KEY, "/" + 
inv.getObjectAttachment(CommonConstants.PATH_KEY) + "/" + inv.getMethodName())
-                .put(TripleConstant.AUTHORITY_KEY, getUrl().getAddress())
-                .put(TripleConstant.CONTENT_TYPE_KEY, 
TripleConstant.CONTENT_PROTO)
-                .put(TripleConstant.TIMEOUT, 
inv.get(CommonConstants.TIMEOUT_KEY) + "m")
-                .put(HttpHeaderNames.TE, HttpHeaderValues.TRAILERS);
+            .put(TripleConstant.AUTHORITY_KEY, getUrl().getAddress())
+            .put(TripleConstant.CONTENT_TYPE_KEY, TripleConstant.CONTENT_PROTO)
+            .put(TripleConstant.TIMEOUT, inv.get(CommonConstants.TIMEOUT_KEY) 
+ "m")
+            .put(HttpHeaderNames.TE, HttpHeaderValues.TRAILERS);
 
         metadata.putIfNotNull(TripleConstant.SERVICE_VERSION, 
inv.getInvoker().getUrl().getVersion())
-                .putIfNotNull(TripleConstant.CONSUMER_APP_NAME_KEY,
-                        (String) 
inv.getObjectAttachments().remove(CommonConstants.APPLICATION_KEY))
-                .putIfNotNull(TripleConstant.CONSUMER_APP_NAME_KEY,
-                        (String) 
inv.getObjectAttachments().remove(CommonConstants.REMOTE_APPLICATION_KEY))
-                .putIfNotNull(TripleConstant.SERVICE_GROUP, 
inv.getInvoker().getUrl().getGroup());
+            .putIfNotNull(TripleConstant.CONSUMER_APP_NAME_KEY,
+                (String) 
inv.getObjectAttachments().remove(CommonConstants.APPLICATION_KEY))
+            .putIfNotNull(TripleConstant.CONSUMER_APP_NAME_KEY,
+                (String) 
inv.getObjectAttachments().remove(CommonConstants.REMOTE_APPLICATION_KEY))
+            .putIfNotNull(TripleConstant.SERVICE_GROUP, 
inv.getInvoker().getUrl().getGroup());
         inv.getObjectAttachments().remove(CommonConstants.GROUP_KEY);
         inv.getObjectAttachments().remove(CommonConstants.INTERFACE_KEY);
         inv.getObjectAttachments().remove(CommonConstants.PATH_KEY);
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index 41dfabf..974d088 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.model.ApplicationModel;
@@ -58,6 +59,7 @@ public abstract class AbstractServerStream extends 
AbstractStream implements Str
     protected AbstractServerStream(URL url, Executor executor, ProviderModel 
providerModel) {
         super(url, executor);
         this.providerModel = providerModel;
+        this.serialize(getUrl().getParameter(Constants.SERIALIZATION_KEY, 
Constants.DEFAULT_REMOTING_SERIALIZATION));
     }
 
     private static Executor lookupExecutor(URL url, ProviderModel 
providerModel) {
@@ -142,7 +144,13 @@ public abstract class AbstractServerStream extends 
AbstractStream implements Str
             if (getMethodDescriptor() == null || 
getMethodDescriptor().isNeedWrap()) {
                 final TripleWrapper.TripleRequestWrapper wrapper = 
TripleUtil.unpack(data,
                         TripleWrapper.TripleRequestWrapper.class);
-                serialize(wrapper.getSerializeType());
+                if 
(!getSerializeType().equals(TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType())))
 {
+                    
transportError(GrpcStatus.fromCode(GrpcStatus.Code.INVALID_ARGUMENT)
+                        .withDescription("Received inconsistent serialization 
type from client, " +
+                            "reject to deserialize! Expected:" + 
getSerializeType() +
+                            " Actual:" + 
TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType())));
+                    return null;
+                }
                 if (getMethodDescriptor() == null) {
                     final String[] paramTypes = 
wrapper.getArgTypesList().toArray(new String[wrapper.getArgsCount()]);
 

Reply via email to