This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 7f91631 [3.0-Triple] Support triple server stream (#8542)
7f91631 is described below
commit 7f91631b447b68d9a9381b9f75b84c1feead1c91
Author: GuoHao <[email protected]>
AuthorDate: Mon Aug 23 10:48:16 2021 +0800
[3.0-Triple] Support triple server stream (#8542)
* Support triple server stream
* Fix prvodier NPE
---
.../apache/dubbo/rpc/model/MethodDescriptor.java | 41 ++++++++++------------
.../apache/dubbo/descriptor/DescriptorService.java | 5 +++
.../dubbo/descriptor/MethodDescriptorTest.java | 14 ++++++++
.../dubbo/rpc/protocol/tri/ServerStream.java | 15 +++++++-
.../rpc/protocol/tri/TripleClientHandler.java | 29 ++++++++++-----
5 files changed, 72 insertions(+), 32 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
index b49aff0..8ce071a 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/MethodDescriptor.java
@@ -38,6 +38,7 @@ import static
org.apache.dubbo.common.constants.CommonConstants.PROTOBUF_MESSAGE
*
*/
public class MethodDescriptor {
+ private static final Logger logger =
LoggerFactory.getLogger(MethodDescriptor.class);
private final Method method;
// private final boolean isCallBack;
// private final boolean isFuture;
@@ -49,10 +50,9 @@ public class MethodDescriptor {
private final Type[] returnTypes;
private final String methodName;
private final boolean generic;
+ private final boolean wrap;
private final RpcType rpcType;
-
private final ConcurrentMap<String, Object> attributeMap = new
ConcurrentHashMap<>();
- private static final Logger logger =
LoggerFactory.getLogger(MethodDescriptor.class);
public MethodDescriptor(Method method) {
this.method = method;
@@ -63,20 +63,18 @@ public class MethodDescriptor {
(Class<?>) ((ParameterizedType)
method.getGenericReturnType()).getActualTypeArguments()[0]};
this.returnClass = (Class<?>) ((ParameterizedType)
method.getGenericParameterTypes()[0])
.getActualTypeArguments()[0];
- if (needWrap()) {
- rpcType = RpcType.STREAM_WRAP;
- } else {
- rpcType = RpcType.STREAM_UNWRAP;
- }
+ this.rpcType = RpcType.BIDIRECTIONAL_STREAM;
+ } else if (parameterTypes.length == 2 &&
method.getReturnType().equals(Void.TYPE)
+ && !isStreamType(parameterTypes[0]) &&
isStreamType(parameterTypes[1])) {
+ this.parameterClasses = method.getParameterTypes();
+ this.returnClass = (Class<?>)
((ParameterizedType)method.getGenericParameterTypes()[1]).getActualTypeArguments()[0];
+ this.rpcType = RpcType.SERVER_STREAM;
} else {
this.parameterClasses = method.getParameterTypes();
this.returnClass = method.getReturnType();
- if (needWrap()) {
- rpcType = RpcType.UNARY_WRAP;
- } else {
- rpcType = RpcType.UNARY_UNWRAP;
- }
+ this.rpcType = RpcType.UNARY;
}
+ this.wrap = needWrap();
Type[] returnTypesResult;
try {
returnTypesResult = ReflectUtils.getReturnTypes(method);
@@ -98,15 +96,19 @@ public class MethodDescriptor {
}
public boolean isStream() {
- return rpcType.equals(RpcType.STREAM_WRAP) ||
rpcType.equals(RpcType.STREAM_UNWRAP);
+ return rpcType.equals(RpcType.SERVER_STREAM) ||
rpcType.equals(RpcType.BIDIRECTIONAL_STREAM) ||
rpcType.equals(RpcType.CLIENT_STREAM);
}
public boolean isUnary() {
- return rpcType.equals(RpcType.UNARY_WRAP) ||
rpcType.equals(RpcType.UNARY_UNWRAP);
+ return rpcType.equals(RpcType.UNARY);
}
public boolean isNeedWrap() {
- return rpcType.equals(RpcType.UNARY_WRAP) ||
rpcType.equals(RpcType.STREAM_WRAP);
+ return wrap;
+ }
+
+ public RpcType getRpcType() {
+ return rpcType;
}
private boolean needWrap() {
@@ -115,7 +117,7 @@ public class MethodDescriptor {
} else if ($ECHO.equals(methodName)) {
return true;
} else {
- if (parameterClasses.length != 1 || parameterClasses[0] == null) {
+ if ((rpcType != RpcType.SERVER_STREAM && parameterClasses.length
!= 1) || parameterClasses[0] == null) {
return true;
}
@@ -129,10 +131,8 @@ public class MethodDescriptor {
}
}
}
-
clazz = clazz.getSuperclass();
}
-
return true;
}
}
@@ -182,10 +182,7 @@ public class MethodDescriptor {
}
public enum RpcType {
- UNARY_WRAP,
- UNARY_UNWRAP,
- STREAM_WRAP,
- STREAM_UNWRAP;
+ UNARY, SERVER_STREAM, CLIENT_STREAM, BIDIRECTIONAL_STREAM
}
}
diff --git
a/dubbo-common/src/test/java/org/apache/dubbo/descriptor/DescriptorService.java
b/dubbo-common/src/test/java/org/apache/dubbo/descriptor/DescriptorService.java
index 534e104..98d2dc7 100644
---
a/dubbo-common/src/test/java/org/apache/dubbo/descriptor/DescriptorService.java
+++
b/dubbo-common/src/test/java/org/apache/dubbo/descriptor/DescriptorService.java
@@ -16,6 +16,7 @@
*/
package org.apache.dubbo.descriptor;
+import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.proto.HelloReply;
public interface DescriptorService {
@@ -29,4 +30,8 @@ public interface DescriptorService {
* @return
*/
HelloReply sayHello(HelloReply reply);
+
+ void sayHelloServerStream(HelloReply request, StreamObserver<HelloReply>
reply);
+
+ void sayHelloServerStream2(Object request, StreamObserver<Object> reply);
}
diff --git
a/dubbo-common/src/test/java/org/apache/dubbo/descriptor/MethodDescriptorTest.java
b/dubbo-common/src/test/java/org/apache/dubbo/descriptor/MethodDescriptorTest.java
index 37565c1..ef42c79 100644
---
a/dubbo-common/src/test/java/org/apache/dubbo/descriptor/MethodDescriptorTest.java
+++
b/dubbo-common/src/test/java/org/apache/dubbo/descriptor/MethodDescriptorTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.dubbo.descriptor;
+import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.proto.HelloReply;
import org.apache.dubbo.rpc.model.MethodDescriptor;
@@ -58,6 +59,19 @@ public class MethodDescriptorTest {
}
@Test
+ public void testIsServerStream() throws NoSuchMethodException {
+ Method method =
DescriptorService.class.getMethod("sayHelloServerStream", HelloReply.class,
StreamObserver.class);
+ MethodDescriptor descriptor = new MethodDescriptor(method);
+ Assertions.assertFalse(descriptor.isUnary());
+ Assertions.assertFalse(descriptor.isNeedWrap());
+
+ Method method2 =
DescriptorService.class.getMethod("sayHelloServerStream2", Object.class,
StreamObserver.class);
+ MethodDescriptor descriptor2 = new MethodDescriptor(method2);
+ Assertions.assertFalse(descriptor2.isUnary());
+ Assertions.assertTrue(descriptor2.isNeedWrap());
+ }
+
+ @Test
public void testIsNeedWrap() throws NoSuchMethodException {
Method method = DescriptorService.class.getMethod("noParameterMethod");
MethodDescriptor descriptor = new MethodDescriptor(method);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
index 7209b5b..98f3ec3 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.model.MethodDescriptor;
public class ServerStream extends AbstractServerStream implements Stream {
protected ServerStream(URL url) {
@@ -72,6 +73,9 @@ public class ServerStream extends AbstractServerStream
implements Stream {
@Override
public void onMetadata(Metadata metadata, boolean endStream,
OperationHandler handler) {
super.onMetadata(metadata, endStream, handler);
+ if (getMethodDescriptor().getRpcType() ==
MethodDescriptor.RpcType.SERVER_STREAM) {
+ return;
+ }
final RpcInvocation inv = buildInvocation(metadata);
inv.setArguments(new Object[]{asStreamObserver()});
final Result result = getInvoker().invoke(inv);
@@ -88,7 +92,13 @@ public class ServerStream extends AbstractServerStream
implements Stream {
try {
final Object[] arguments = deserializeRequest(in);
if (arguments != null) {
- getStreamSubscriber().onNext(arguments[0]);
+ if (getMethodDescriptor().getRpcType() ==
MethodDescriptor.RpcType.SERVER_STREAM) {
+ final RpcInvocation inv =
buildInvocation(getHeaders());
+ inv.setArguments(new Object[]{arguments[0],
asStreamObserver()});
+ getInvoker().invoke(inv);
+ } else {
+ getStreamSubscriber().onNext(arguments[0]);
+ }
}
} catch (Throwable t) {
transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
@@ -99,6 +109,9 @@ public class ServerStream extends AbstractServerStream
implements Stream {
@Override
public void onComplete(OperationHandler handler) {
+ if (getMethodDescriptor().getRpcType() ==
MethodDescriptor.RpcType.SERVER_STREAM) {
+ return;
+ }
getStreamSubscriber().onCompleted();
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
index 3600143..ac13126 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
@@ -16,12 +16,6 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-import io.netty.handler.codec.http2.Http2GoAwayFrame;
-import io.netty.handler.codec.http2.Http2SettingsFrame;
-import io.netty.util.ReferenceCountUtil;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.stream.StreamObserver;
@@ -39,6 +33,13 @@ import org.apache.dubbo.rpc.model.ConsumerModel;
import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.ServiceRepository;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http2.Http2GoAwayFrame;
+import io.netty.handler.codec.http2.Http2SettingsFrame;
+import io.netty.util.ReferenceCountUtil;
+
public class TripleClientHandler extends ChannelDuplexHandler {
@Override
@@ -99,10 +100,20 @@ public class TripleClientHandler extends
ChannelDuplexHandler {
stream.asStreamObserver().onNext(inv);
stream.asStreamObserver().onCompleted();
} else {
- final StreamObserver<Object> streamObserver =
(StreamObserver<Object>) inv.getArguments()[0];
- stream.subscribe(streamObserver);
Response response = new Response(req.getId(), req.getVersion());
- final AppResponse result = new
AppResponse(stream.asStreamObserver());
+ AppResponse result;
+ if (methodDescriptor.getRpcType() ==
MethodDescriptor.RpcType.BIDIRECTIONAL_STREAM
+ || methodDescriptor.getRpcType() ==
MethodDescriptor.RpcType.CLIENT_STREAM) {
+ final StreamObserver<Object> streamObserver =
(StreamObserver<Object>) inv.getArguments()[0];
+ stream.subscribe(streamObserver);
+ result = new AppResponse(stream.asStreamObserver());
+ } else {
+ final StreamObserver<Object> streamObserver =
(StreamObserver<Object>) inv.getArguments()[1];
+ stream.subscribe(streamObserver);
+ result = new AppResponse();
+ stream.asStreamObserver().onNext(inv.getArguments()[0]);
+ stream.asStreamObserver().onCompleted();
+ }
response.setResult(result);
DefaultFuture2.received(stream.getConnection(), response);
}