This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 0d88f62830 fix:onnext callback method would be invalid when the
provider method with triple protocol is overloaded. (#13057)
0d88f62830 is described below
commit 0d88f6283033e2389cdd4c044da0d67a6f6c6ed1
Author: KitwahSin <[email protected]>
AuthorDate: Wed Sep 27 15:31:25 2023 +0800
fix:onnext callback method would be invalid when the provider method with
triple protocol is overloaded. (#13057)
* For the same method, rpcType.BI_STREAM and rpcType.SERVER_STREAM cannot
exist at the same time.
* remove unused imported item.
* add exception test.
* add licence comment.
---------
Co-authored-by: Albumen Kevin <[email protected]>
---
.../rpc/model/ReflectionServiceDescriptor.java | 13 ++++++++-
.../rpc/model/ReflectionServiceDescriptorTest.java | 11 +++++++
.../org/apache/dubbo/rpc/support/DemoService1.java | 25 ++++++++++++++++
.../apache/dubbo/rpc/support/DemoService1Impl.java | 34 ++++++++++++++++++++++
4 files changed, 82 insertions(+), 1 deletion(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ReflectionServiceDescriptor.java
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ReflectionServiceDescriptor.java
index 629d6c0385..489f7e4c6c 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ReflectionServiceDescriptor.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ReflectionServiceDescriptor.java
@@ -75,7 +75,18 @@ public class ReflectionServiceDescriptor implements
ServiceDescriptor {
methods.forEach((methodName, methodList) -> {
Map<String, MethodDescriptor> descMap =
descToMethods.computeIfAbsent(methodName, k -> new HashMap<>());
- methodList.forEach(methodModel ->
descMap.put(methodModel.getParamDesc(), methodModel));
+ // not support BI_STREAM and SERVER_STREAM at the same time, for
example,
+ // void foo(Request, StreamObserver<Response>) ---> SERVER_STREAM
+ // StreamObserver<Response> foo(StreamObserver<Request>) --->
BI_STREAM
+ long streamMethodCount = methodList.stream()
+ .peek(methodModel -> descMap.put(methodModel.getParamDesc(),
methodModel))
+ .map(MethodDescriptor::getRpcType)
+ .filter(rpcType -> rpcType ==
MethodDescriptor.RpcType.SERVER_STREAM
+ || rpcType == MethodDescriptor.RpcType.BI_STREAM)
+ .count();
+ if (streamMethodCount > 1L)
+ throw new IllegalStateException("Stream method could not be
overloaded.There are " + streamMethodCount
+ +" stream method signatures. method(" + methodName + ")");
});
}
diff --git
a/dubbo-common/src/test/java/org/apache/dubbo/rpc/model/ReflectionServiceDescriptorTest.java
b/dubbo-common/src/test/java/org/apache/dubbo/rpc/model/ReflectionServiceDescriptorTest.java
index 3361312e1f..b7011526be 100644
---
a/dubbo-common/src/test/java/org/apache/dubbo/rpc/model/ReflectionServiceDescriptorTest.java
+++
b/dubbo-common/src/test/java/org/apache/dubbo/rpc/model/ReflectionServiceDescriptorTest.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.common.utils.ReflectUtils;
import org.apache.dubbo.metadata.definition.TypeDefinitionBuilder;
import org.apache.dubbo.rpc.support.DemoService;
+import org.apache.dubbo.rpc.support.DemoService1;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
@@ -42,6 +43,16 @@ class ReflectionServiceDescriptorTest {
Assertions.assertEquals(1, service2.getMethods("sayHello2").size());
}
+ @Test
+ void testStreamRpcTypeException() {
+ try {
+ new ReflectionServiceDescriptor(DemoService1.class);
+ } catch (IllegalStateException e) {
+ Assertions.assertTrue(e.getMessage()
+ .contains("Stream method could not be overloaded."));
+ }
+ }
+
@Test
void getFullServiceDefinition() {
TypeDefinitionBuilder.initBuilders(new FrameworkModel());
diff --git
a/dubbo-common/src/test/java/org/apache/dubbo/rpc/support/DemoService1.java
b/dubbo-common/src/test/java/org/apache/dubbo/rpc/support/DemoService1.java
new file mode 100644
index 0000000000..1251c7cce9
--- /dev/null
+++ b/dubbo-common/src/test/java/org/apache/dubbo/rpc/support/DemoService1.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.support;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+
+public interface DemoService1 {
+ StreamObserver<String> sayHello(StreamObserver<String> request);
+
+ void sayHello(String msg, StreamObserver<String> request);
+}
diff --git
a/dubbo-common/src/test/java/org/apache/dubbo/rpc/support/DemoService1Impl.java
b/dubbo-common/src/test/java/org/apache/dubbo/rpc/support/DemoService1Impl.java
new file mode 100644
index 0000000000..7b5ec906fb
--- /dev/null
+++
b/dubbo-common/src/test/java/org/apache/dubbo/rpc/support/DemoService1Impl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.support;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+
+public class DemoService1Impl implements DemoService1{
+ @Override
+ public StreamObserver<String> sayHello(StreamObserver<String> request) {
+ request.onNext("BI_STREAM");
+ return request;
+ }
+
+ @Override
+ public void sayHello(String msg, StreamObserver<String> request) {
+ request.onNext(msg);
+ request.onNext("SERVER_STREAM");
+ request.onCompleted();
+ }
+}