This is an automated email from the ASF dual-hosted git repository.

earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 0d88f62830 fix:onnext callback method would be invalid when the 
provider method with triple protocol is overloaded. (#13057)
0d88f62830 is described below

commit 0d88f6283033e2389cdd4c044da0d67a6f6c6ed1
Author: KitwahSin <[email protected]>
AuthorDate: Wed Sep 27 15:31:25 2023 +0800

    fix:onnext callback method would be invalid when the provider method with 
triple protocol is overloaded. (#13057)
    
    * For the same method, rpcType.BI_STREAM and rpcType.SERVER_STREAM cannot 
exist at the same time.
    
    * remove unused imported item.
    
    * add exception test.
    
    * add licence comment.
    
    ---------
    
    Co-authored-by: Albumen Kevin <[email protected]>
---
 .../rpc/model/ReflectionServiceDescriptor.java     | 13 ++++++++-
 .../rpc/model/ReflectionServiceDescriptorTest.java | 11 +++++++
 .../org/apache/dubbo/rpc/support/DemoService1.java | 25 ++++++++++++++++
 .../apache/dubbo/rpc/support/DemoService1Impl.java | 34 ++++++++++++++++++++++
 4 files changed, 82 insertions(+), 1 deletion(-)

diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ReflectionServiceDescriptor.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ReflectionServiceDescriptor.java
index 629d6c0385..489f7e4c6c 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ReflectionServiceDescriptor.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ReflectionServiceDescriptor.java
@@ -75,7 +75,18 @@ public class ReflectionServiceDescriptor implements 
ServiceDescriptor {
 
         methods.forEach((methodName, methodList) -> {
             Map<String, MethodDescriptor> descMap = 
descToMethods.computeIfAbsent(methodName, k -> new HashMap<>());
-            methodList.forEach(methodModel -> 
descMap.put(methodModel.getParamDesc(), methodModel));
+            // not support BI_STREAM and SERVER_STREAM at the same time, for 
example,
+            // void foo(Request, StreamObserver<Response>)  ---> SERVER_STREAM
+            // StreamObserver<Response> foo(StreamObserver<Request>)   ---> 
BI_STREAM
+            long streamMethodCount =  methodList.stream()
+                .peek(methodModel -> descMap.put(methodModel.getParamDesc(), 
methodModel))
+                .map(MethodDescriptor::getRpcType)
+                .filter(rpcType -> rpcType == 
MethodDescriptor.RpcType.SERVER_STREAM
+                    || rpcType == MethodDescriptor.RpcType.BI_STREAM)
+                .count();
+            if (streamMethodCount > 1L)
+                throw new IllegalStateException("Stream method could not be 
overloaded.There are " + streamMethodCount
+                    +" stream method signatures. method(" + methodName + ")");
         });
     }
 
diff --git 
a/dubbo-common/src/test/java/org/apache/dubbo/rpc/model/ReflectionServiceDescriptorTest.java
 
b/dubbo-common/src/test/java/org/apache/dubbo/rpc/model/ReflectionServiceDescriptorTest.java
index 3361312e1f..b7011526be 100644
--- 
a/dubbo-common/src/test/java/org/apache/dubbo/rpc/model/ReflectionServiceDescriptorTest.java
+++ 
b/dubbo-common/src/test/java/org/apache/dubbo/rpc/model/ReflectionServiceDescriptorTest.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.common.utils.ReflectUtils;
 import org.apache.dubbo.metadata.definition.TypeDefinitionBuilder;
 import org.apache.dubbo.rpc.support.DemoService;
 
+import org.apache.dubbo.rpc.support.DemoService1;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
@@ -42,6 +43,16 @@ class ReflectionServiceDescriptorTest {
         Assertions.assertEquals(1, service2.getMethods("sayHello2").size());
     }
 
+    @Test
+    void testStreamRpcTypeException() {
+        try {
+            new ReflectionServiceDescriptor(DemoService1.class);
+        } catch (IllegalStateException e) {
+            Assertions.assertTrue(e.getMessage()
+                .contains("Stream method could not be overloaded."));
+        }
+    }
+
     @Test
     void getFullServiceDefinition() {
         TypeDefinitionBuilder.initBuilders(new FrameworkModel());
diff --git 
a/dubbo-common/src/test/java/org/apache/dubbo/rpc/support/DemoService1.java 
b/dubbo-common/src/test/java/org/apache/dubbo/rpc/support/DemoService1.java
new file mode 100644
index 0000000000..1251c7cce9
--- /dev/null
+++ b/dubbo-common/src/test/java/org/apache/dubbo/rpc/support/DemoService1.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.support;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+
+public interface DemoService1 {
+    StreamObserver<String> sayHello(StreamObserver<String> request);
+
+    void sayHello(String msg, StreamObserver<String> request);
+}
diff --git 
a/dubbo-common/src/test/java/org/apache/dubbo/rpc/support/DemoService1Impl.java 
b/dubbo-common/src/test/java/org/apache/dubbo/rpc/support/DemoService1Impl.java
new file mode 100644
index 0000000000..7b5ec906fb
--- /dev/null
+++ 
b/dubbo-common/src/test/java/org/apache/dubbo/rpc/support/DemoService1Impl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.support;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+
+public class DemoService1Impl implements DemoService1{
+    @Override
+    public StreamObserver<String> sayHello(StreamObserver<String> request) {
+        request.onNext("BI_STREAM");
+        return request;
+    }
+
+    @Override
+    public void sayHello(String msg, StreamObserver<String> request) {
+        request.onNext(msg);
+        request.onNext("SERVER_STREAM");
+        request.onCompleted();
+    }
+}

Reply via email to