This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 7f91631  [3.0-Triple] Support triple server stream (#8542)
7f91631 is described below

commit 7f91631b447b68d9a9381b9f75b84c1feead1c91
Author: GuoHao <[email protected]>
AuthorDate: Mon Aug 23 10:48:16 2021 +0800

    [3.0-Triple] Support triple server stream (#8542)
    
    * Support triple server stream
    
    * Fix prvodier NPE
---
 .../apache/dubbo/rpc/model/MethodDescriptor.java   | 41 ++++++++++------------
 .../apache/dubbo/descriptor/DescriptorService.java |  5 +++
 .../dubbo/descriptor/MethodDescriptorTest.java     | 14 ++++++++
 .../dubbo/rpc/protocol/tri/ServerStream.java       | 15 +++++++-
 .../rpc/protocol/tri/TripleClientHandler.java      | 29 ++++++++++-----
 5 files changed, 72 insertions(+), 32 deletions(-)

diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java 
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
index b49aff0..8ce071a 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
@@ -38,6 +38,7 @@ import static 
org.apache.dubbo.common.constants.CommonConstants.PROTOBUF_MESSAGE
  *
  */
 public class MethodDescriptor {
+    private static final Logger logger = 
LoggerFactory.getLogger(MethodDescriptor.class);
     private final Method method;
     //    private final boolean isCallBack;
     //    private final boolean isFuture;
@@ -49,10 +50,9 @@ public class MethodDescriptor {
     private final Type[] returnTypes;
     private final String methodName;
     private final boolean generic;
+    private final boolean wrap;
     private final RpcType rpcType;
-
     private final ConcurrentMap<String, Object> attributeMap = new 
ConcurrentHashMap<>();
-    private static final Logger logger = 
LoggerFactory.getLogger(MethodDescriptor.class);
 
     public MethodDescriptor(Method method) {
         this.method = method;
@@ -63,20 +63,18 @@ public class MethodDescriptor {
                     (Class<?>) ((ParameterizedType) 
method.getGenericReturnType()).getActualTypeArguments()[0]};
             this.returnClass = (Class<?>) ((ParameterizedType) 
method.getGenericParameterTypes()[0])
                     .getActualTypeArguments()[0];
-            if (needWrap()) {
-                rpcType = RpcType.STREAM_WRAP;
-            } else {
-                rpcType = RpcType.STREAM_UNWRAP;
-            }
+            this.rpcType = RpcType.BIDIRECTIONAL_STREAM;
+        } else if (parameterTypes.length == 2 && 
method.getReturnType().equals(Void.TYPE)
+                && !isStreamType(parameterTypes[0]) && 
isStreamType(parameterTypes[1])) {
+            this.parameterClasses = method.getParameterTypes();
+            this.returnClass = (Class<?>) 
((ParameterizedType)method.getGenericParameterTypes()[1]).getActualTypeArguments()[0];
+            this.rpcType = RpcType.SERVER_STREAM;
         } else {
             this.parameterClasses = method.getParameterTypes();
             this.returnClass = method.getReturnType();
-            if (needWrap()) {
-                rpcType = RpcType.UNARY_WRAP;
-            } else {
-                rpcType = RpcType.UNARY_UNWRAP;
-            }
+            this.rpcType = RpcType.UNARY;
         }
+        this.wrap = needWrap();
         Type[] returnTypesResult;
         try {
             returnTypesResult = ReflectUtils.getReturnTypes(method);
@@ -98,15 +96,19 @@ public class MethodDescriptor {
     }
 
     public boolean isStream() {
-        return rpcType.equals(RpcType.STREAM_WRAP) || 
rpcType.equals(RpcType.STREAM_UNWRAP);
+        return rpcType.equals(RpcType.SERVER_STREAM) || 
rpcType.equals(RpcType.BIDIRECTIONAL_STREAM) || 
rpcType.equals(RpcType.CLIENT_STREAM);
     }
 
     public boolean isUnary() {
-        return rpcType.equals(RpcType.UNARY_WRAP) || 
rpcType.equals(RpcType.UNARY_UNWRAP);
+        return rpcType.equals(RpcType.UNARY);
     }
 
     public boolean isNeedWrap() {
-        return rpcType.equals(RpcType.UNARY_WRAP) || 
rpcType.equals(RpcType.STREAM_WRAP);
+        return wrap;
+    }
+
+    public RpcType getRpcType() {
+        return rpcType;
     }
 
     private boolean needWrap() {
@@ -115,7 +117,7 @@ public class MethodDescriptor {
         } else if ($ECHO.equals(methodName)) {
             return true;
         } else {
-            if (parameterClasses.length != 1 || parameterClasses[0] == null) {
+            if ((rpcType != RpcType.SERVER_STREAM && parameterClasses.length 
!= 1) || parameterClasses[0] == null) {
                 return true;
             }
 
@@ -129,10 +131,8 @@ public class MethodDescriptor {
                         }
                     }
                 }
-
                 clazz = clazz.getSuperclass();
             }
-
             return true;
         }
     }
@@ -182,10 +182,7 @@ public class MethodDescriptor {
     }
 
     public enum RpcType {
-        UNARY_WRAP,
-        UNARY_UNWRAP,
-        STREAM_WRAP,
-        STREAM_UNWRAP;
+        UNARY, SERVER_STREAM, CLIENT_STREAM, BIDIRECTIONAL_STREAM
     }
 
 }
diff --git 
a/dubbo-common/src/test/java/org/apache/dubbo/descriptor/DescriptorService.java 
b/dubbo-common/src/test/java/org/apache/dubbo/descriptor/DescriptorService.java
index 534e104..98d2dc7 100644
--- 
a/dubbo-common/src/test/java/org/apache/dubbo/descriptor/DescriptorService.java
+++ 
b/dubbo-common/src/test/java/org/apache/dubbo/descriptor/DescriptorService.java
@@ -16,6 +16,7 @@
  */
 package org.apache.dubbo.descriptor;
 
+import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.proto.HelloReply;
 
 public interface DescriptorService {
@@ -29,4 +30,8 @@ public interface DescriptorService {
      * @return
      */
     HelloReply sayHello(HelloReply reply);
+
+    void sayHelloServerStream(HelloReply request, StreamObserver<HelloReply> 
reply);
+
+    void sayHelloServerStream2(Object request, StreamObserver<Object> reply);
 }
diff --git 
a/dubbo-common/src/test/java/org/apache/dubbo/descriptor/MethodDescriptorTest.java
 
b/dubbo-common/src/test/java/org/apache/dubbo/descriptor/MethodDescriptorTest.java
index 37565c1..ef42c79 100644
--- 
a/dubbo-common/src/test/java/org/apache/dubbo/descriptor/MethodDescriptorTest.java
+++ 
b/dubbo-common/src/test/java/org/apache/dubbo/descriptor/MethodDescriptorTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.dubbo.descriptor;
 
+import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.proto.HelloReply;
 import org.apache.dubbo.rpc.model.MethodDescriptor;
 
@@ -58,6 +59,19 @@ public class MethodDescriptorTest {
     }
 
     @Test
+    public void testIsServerStream() throws NoSuchMethodException {
+        Method method = 
DescriptorService.class.getMethod("sayHelloServerStream", HelloReply.class, 
StreamObserver.class);
+        MethodDescriptor descriptor = new MethodDescriptor(method);
+        Assertions.assertFalse(descriptor.isUnary());
+        Assertions.assertFalse(descriptor.isNeedWrap());
+
+        Method method2 = 
DescriptorService.class.getMethod("sayHelloServerStream2", Object.class, 
StreamObserver.class);
+        MethodDescriptor descriptor2 = new MethodDescriptor(method2);
+        Assertions.assertFalse(descriptor2.isUnary());
+        Assertions.assertTrue(descriptor2.isNeedWrap());
+    }
+
+    @Test
     public void testIsNeedWrap() throws NoSuchMethodException {
         Method method = DescriptorService.class.getMethod("noParameterMethod");
         MethodDescriptor descriptor = new MethodDescriptor(method);
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
index 7209b5b..98f3ec3 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.model.MethodDescriptor;
 
 public class ServerStream extends AbstractServerStream implements Stream {
     protected ServerStream(URL url) {
@@ -72,6 +73,9 @@ public class ServerStream extends AbstractServerStream 
implements Stream {
         @Override
         public void onMetadata(Metadata metadata, boolean endStream, 
OperationHandler handler) {
             super.onMetadata(metadata, endStream, handler);
+            if (getMethodDescriptor().getRpcType() == 
MethodDescriptor.RpcType.SERVER_STREAM) {
+                return;
+            }
             final RpcInvocation inv = buildInvocation(metadata);
             inv.setArguments(new Object[]{asStreamObserver()});
             final Result result = getInvoker().invoke(inv);
@@ -88,7 +92,13 @@ public class ServerStream extends AbstractServerStream 
implements Stream {
             try {
                 final Object[] arguments = deserializeRequest(in);
                 if (arguments != null) {
-                    getStreamSubscriber().onNext(arguments[0]);
+                    if (getMethodDescriptor().getRpcType() == 
MethodDescriptor.RpcType.SERVER_STREAM) {
+                        final RpcInvocation inv = 
buildInvocation(getHeaders());
+                        inv.setArguments(new Object[]{arguments[0], 
asStreamObserver()});
+                        getInvoker().invoke(inv);
+                    } else {
+                        getStreamSubscriber().onNext(arguments[0]);
+                    }
                 }
             } catch (Throwable t) {
                 transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
@@ -99,6 +109,9 @@ public class ServerStream extends AbstractServerStream 
implements Stream {
 
         @Override
         public void onComplete(OperationHandler handler) {
+            if (getMethodDescriptor().getRpcType() == 
MethodDescriptor.RpcType.SERVER_STREAM) {
+                return;
+            }
             getStreamSubscriber().onCompleted();
         }
     }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
index 3600143..ac13126 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
@@ -16,12 +16,6 @@
  */
 package org.apache.dubbo.rpc.protocol.tri;
 
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-import io.netty.handler.codec.http2.Http2GoAwayFrame;
-import io.netty.handler.codec.http2.Http2SettingsFrame;
-import io.netty.util.ReferenceCountUtil;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.stream.StreamObserver;
@@ -39,6 +33,13 @@ import org.apache.dubbo.rpc.model.ConsumerModel;
 import org.apache.dubbo.rpc.model.MethodDescriptor;
 import org.apache.dubbo.rpc.model.ServiceRepository;
 
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http2.Http2GoAwayFrame;
+import io.netty.handler.codec.http2.Http2SettingsFrame;
+import io.netty.util.ReferenceCountUtil;
+
 public class TripleClientHandler extends ChannelDuplexHandler {
 
     @Override
@@ -99,10 +100,20 @@ public class TripleClientHandler extends 
ChannelDuplexHandler {
             stream.asStreamObserver().onNext(inv);
             stream.asStreamObserver().onCompleted();
         } else {
-            final StreamObserver<Object> streamObserver = 
(StreamObserver<Object>) inv.getArguments()[0];
-            stream.subscribe(streamObserver);
             Response response = new Response(req.getId(), req.getVersion());
-            final AppResponse result = new 
AppResponse(stream.asStreamObserver());
+            AppResponse result;
+            if (methodDescriptor.getRpcType() == 
MethodDescriptor.RpcType.BIDIRECTIONAL_STREAM
+                    || methodDescriptor.getRpcType() == 
MethodDescriptor.RpcType.CLIENT_STREAM) {
+                final StreamObserver<Object> streamObserver = 
(StreamObserver<Object>) inv.getArguments()[0];
+                stream.subscribe(streamObserver);
+                result = new AppResponse(stream.asStreamObserver());
+            } else {
+                final StreamObserver<Object> streamObserver = 
(StreamObserver<Object>) inv.getArguments()[1];
+                stream.subscribe(streamObserver);
+                result = new AppResponse();
+                stream.asStreamObserver().onNext(inv.getArguments()[0]);
+                stream.asStreamObserver().onCompleted();
+            }
             response.setResult(result);
             DefaultFuture2.received(stream.getConnection(), response);
         }

Reply via email to