This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new b5e65a6 Support attachment when triple\'s resopnse is an exception
(#8488)
b5e65a6 is described below
commit b5e65a6d25758bd459a5d8898feeaae32f6c388f
Author: GuoHao <[email protected]>
AuthorDate: Thu Aug 12 21:43:22 2021 +0800
Support attachment when triple\'s resopnse is an exception (#8488)
---
.../dubbo/rpc/protocol/tri/AbstractStream.java | 10 +++++++--
.../dubbo/rpc/protocol/tri/UnaryServerStream.java | 26 +++++++++++-----------
2 files changed, 21 insertions(+), 15 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index a807314..5934871 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -185,19 +185,25 @@ public abstract class AbstractStream implements Stream {
public TransportObserver asTransportObserver() {
return transportObserver;
}
-
- protected void transportError(GrpcStatus status) {
+ protected void transportError(GrpcStatus status, Map<String,Object>
attachments) {
// set metadata
Metadata metadata = getMetaData(status);
getTransportSubscriber().tryOnMetadata(metadata, false);
// set trailers
Metadata trailers = getTrailers(status);
+ if (attachments != null) {
+ convertAttachment(trailers, attachments);
+ }
getTransportSubscriber().tryOnMetadata(trailers, true);
if (LOGGER.isErrorEnabled()) {
LOGGER.error("[Triple-Server-Error] " + status.toMessage());
}
}
+ protected void transportError(GrpcStatus status) {
+ transportError(status,null);
+ }
+
protected void transportError(Throwable throwable) {
GrpcStatus status = new GrpcStatus(Code.UNKNOWN, throwable,
throwable.getMessage());
Metadata metadata = getMetaData(status);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
index c3a447e..e7cb9fb 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
@@ -86,23 +86,23 @@ public class UnaryServerStream extends AbstractServerStream
implements Stream {
CompletionStage<Object> future =
result.thenApply(Function.identity());
BiConsumer<Object, Throwable> onComplete = (appResult, t) -> {
- try {
- if (t != null) {
- if (t instanceof TimeoutException) {
-
transportError(GrpcStatus.fromCode(GrpcStatus.Code.DEADLINE_EXCEEDED).withCause(t));
- } else {
-
transportError(GrpcStatus.fromCode(GrpcStatus.Code.UNKNOWN).withCause(t));
- }
- return;
+ if (t != null) {
+ if (t instanceof TimeoutException) {
+
transportError(GrpcStatus.fromCode(GrpcStatus.Code.DEADLINE_EXCEEDED).withCause(t));
+ } else {
+
transportError(GrpcStatus.fromCode(GrpcStatus.Code.UNKNOWN).withCause(t));
}
- AppResponse response = (AppResponse) appResult;
+ return;
+ }
+ AppResponse response = (AppResponse) appResult;
+ try {
if (response.hasException()) {
final Throwable exception = response.getException();
if (exception instanceof TripleRpcException) {
- transportError(((TripleRpcException)
exception).getStatus());
+ transportError(((TripleRpcException)
exception).getStatus(), response.getObjectAttachments());
} else {
transportError(GrpcStatus.fromCode(GrpcStatus.Code.UNKNOWN)
- .withCause(exception));
+ .withCause(exception),
response.getObjectAttachments());
}
return;
}
@@ -131,11 +131,11 @@ public class UnaryServerStream extends
AbstractServerStream implements Stream {
} catch (Throwable e) {
LOGGER.warn("Exception processing triple message", e);
if (e instanceof TripleRpcException) {
- transportError(((TripleRpcException) e).getStatus());
+ transportError(((TripleRpcException) e).getStatus(),
response.getObjectAttachments());
} else {
transportError(GrpcStatus.fromCode(GrpcStatus.Code.UNKNOWN)
.withDescription("Exception occurred in
provider's execution:" + e.getMessage())
- .withCause(e));
+ .withCause(e),
response.getObjectAttachments());
}
}
};