This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.1 by this push:
new fe50ac5d79 Remove grpc-status-details when no pb exists (#11249)
fe50ac5d79 is described below
commit fe50ac5d794243f60f2e824314894e9a74977368
Author: earthchen <[email protected]>
AuthorDate: Thu Jan 12 14:54:33 2023 +0800
Remove grpc-status-details when no pb exists (#11249)
* remove pb status
* fix
* fix
* fix
* fix
---
.../dubbo/rpc/protocol/tri/TripleProtocol.java | 2 +-
.../rpc/protocol/tri/call/TripleClientCall.java | 73 +-------------------
.../rpc/protocol/tri/stream/AbstractStream.java | 19 ++++++
.../protocol/tri/stream/TripleClientStream.java | 78 +++++++++++++++++++++-
.../protocol/tri/stream/TripleServerStream.java | 3 +
5 files changed, 101 insertions(+), 74 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
index 2c0024d0ec..16bd6de96e 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
@@ -60,7 +60,6 @@ public class TripleProtocol extends AbstractProtocol {
*/
public static boolean CONVERT_NO_LOWER_HEADER = false;
-
public TripleProtocol(FrameworkModel frameworkModel) {
this.frameworkModel = frameworkModel;
this.triBuiltinService = new TriBuiltinService(frameworkModel);
@@ -80,6 +79,7 @@ public class TripleProtocol extends AbstractProtocol {
return 50051;
}
+
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
index 1792fc973e..a7d5e39d80 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
@@ -23,10 +23,7 @@ import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.remoting.api.Connection;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.protocol.tri.ClassLoadUtil;
-import org.apache.dubbo.rpc.protocol.tri.ExceptionUtils;
import org.apache.dubbo.rpc.protocol.tri.RequestMetadata;
-import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
import org.apache.dubbo.rpc.protocol.tri.compressor.Identity;
import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
@@ -34,14 +31,6 @@ import org.apache.dubbo.rpc.protocol.tri.stream.ClientStream;
import org.apache.dubbo.rpc.protocol.tri.stream.StreamUtils;
import org.apache.dubbo.rpc.protocol.tri.stream.TripleClientStream;
-import com.google.protobuf.Any;
-import com.google.rpc.DebugInfo;
-import com.google.rpc.ErrorInfo;
-import com.google.rpc.Status;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -111,15 +100,8 @@ public class TripleClientCall implements ClientCall,
ClientStream.Listener {
return;
}
done = true;
- final TriRpcStatus detailStatus;
- final TriRpcStatus statusFromTrailers =
getStatusFromTrailers(excludeHeaders);
- if (statusFromTrailers != null) {
- detailStatus = statusFromTrailers;
- } else {
- detailStatus = status;
- }
try {
- listener.onClose(detailStatus,
StreamUtils.toAttachments(attachments));
+ listener.onClose(status, StreamUtils.toAttachments(attachments));
} catch (Throwable t) {
cancelByLocal(
TriRpcStatus.INTERNAL.withDescription("Close stream
error").withCause(t)
@@ -130,59 +112,6 @@ public class TripleClientCall implements ClientCall,
ClientStream.Listener {
}
}
- private TriRpcStatus getStatusFromTrailers(Map<String, String> metadata) {
- if (null == metadata) {
- return null;
- }
- // second get status detail
- if
(!metadata.containsKey(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader())) {
- return null;
- }
- final String raw =
(metadata.remove(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader()));
- byte[] statusDetailBin = StreamUtils.decodeASCIIByte(raw);
- ClassLoader tccl = Thread.currentThread().getContextClassLoader();
- try {
- final Status statusDetail = Status.parseFrom(statusDetailBin);
- List<Any> detailList = statusDetail.getDetailsList();
- Map<Class<?>, Object> classObjectMap =
tranFromStatusDetails(detailList);
-
- // get common exception from DebugInfo
- TriRpcStatus status = TriRpcStatus.fromCode(statusDetail.getCode())
-
.withDescription(TriRpcStatus.decodeMessage(statusDetail.getMessage()));
- DebugInfo debugInfo = (DebugInfo)
classObjectMap.get(DebugInfo.class);
- if (debugInfo != null) {
- String msg = ExceptionUtils.getStackFrameString(
- debugInfo.getStackEntriesList());
- status = status.appendDescription(msg);
- }
- return status;
- } catch (IOException ioException) {
- return null;
- } finally {
- ClassLoadUtil.switchContextLoader(tccl);
- }
-
- }
-
- private Map<Class<?>, Object> tranFromStatusDetails(List<Any> detailList) {
- Map<Class<?>, Object> map = new HashMap<>(detailList.size());
- try {
- for (Any any : detailList) {
- if (any.is(ErrorInfo.class)) {
- ErrorInfo errorInfo = any.unpack(ErrorInfo.class);
- map.putIfAbsent(ErrorInfo.class, errorInfo);
- } else if (any.is(DebugInfo.class)) {
- DebugInfo debugInfo = any.unpack(DebugInfo.class);
- map.putIfAbsent(DebugInfo.class, debugInfo);
- }
- // support others type but now only support this
- }
- } catch (Throwable t) {
- LOGGER.error(PROTOCOL_FAILED_RESPONSE, "", "", "tran from
grpc-status-details error", t);
- }
- return map;
- }
-
@Override
public void onStart() {
listener.onStart(TripleClientCall.this);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
index b9496c0c1a..afa8bdc51b 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractStream.java
@@ -18,6 +18,7 @@
package org.apache.dubbo.rpc.protocol.tri.stream;
import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
+import org.apache.dubbo.common.utils.ClassUtils;
import org.apache.dubbo.rpc.model.FrameworkModel;
import java.util.concurrent.Executor;
@@ -30,8 +31,26 @@ public abstract class AbstractStream implements Stream {
protected final Executor executor;
protected final FrameworkModel frameworkModel;
+
+ private static final boolean HAS_PROTOBUF = hasProtobuf();
+
public AbstractStream(Executor executor, FrameworkModel frameworkModel) {
this.executor = new SerializingExecutor(executor);
this.frameworkModel = frameworkModel;
}
+
+
+ public static boolean getGrpcStatusDetailEnabled() {
+ return HAS_PROTOBUF;
+ }
+
+
+ private static boolean hasProtobuf() {
+ try {
+ ClassUtils.forName("com.google.protobuf.Message");
+ return true;
+ } catch (ClassNotFoundException ignore) {
+ return false;
+ }
+ }
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
index 27efb969ba..a441196bf4 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
@@ -21,6 +21,8 @@ import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.rpc.protocol.tri.ClassLoadUtil;
+import org.apache.dubbo.rpc.protocol.tri.ExceptionUtils;
import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
import org.apache.dubbo.rpc.protocol.tri.command.CancelQueueCommand;
import org.apache.dubbo.rpc.protocol.tri.command.DataQueueCommand;
@@ -36,6 +38,10 @@ import
org.apache.dubbo.rpc.protocol.tri.transport.TripleCommandOutBoundHandler;
import
org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2ClientResponseHandler;
import org.apache.dubbo.rpc.protocol.tri.transport.WriteQueue;
+import com.google.protobuf.Any;
+import com.google.rpc.DebugInfo;
+import com.google.rpc.ErrorInfo;
+import com.google.rpc.Status;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -46,11 +52,16 @@ import
io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
+import java.io.IOException;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
+import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_RESPONSE;
+
/**
* ClientStream is an abstraction for bi-directional messaging. It maintains a
{@link WriteQueue} to
@@ -195,7 +206,14 @@ public class TripleClientStream extends AbstractStream
implements ClientStream {
final Map<String, Object> attachments = headersToMap(trailers, ()
-> {
return
reserved.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader());
});
- listener.onComplete(status, attachments, reserved);
+ final TriRpcStatus detailStatus;
+ final TriRpcStatus statusFromTrailers =
getStatusFromTrailers(reserved);
+ if (statusFromTrailers != null) {
+ detailStatus = statusFromTrailers;
+ } else {
+ detailStatus = status;
+ }
+ listener.onComplete(detailStatus, attachments, reserved);
}
private TriRpcStatus validateHeaderStatus(Http2Headers headers) {
@@ -311,6 +329,64 @@ public class TripleClientStream extends AbstractStream
implements ClientStream {
"missing GRPC status, inferred error from HTTP status code");
}
+
+ private TriRpcStatus getStatusFromTrailers(Map<String, String>
metadata) {
+ if (null == metadata) {
+ return null;
+ }
+ if (!getGrpcStatusDetailEnabled()){
+ return null;
+ }
+ // second get status detail
+ if
(!metadata.containsKey(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader())) {
+ return null;
+ }
+ final String raw =
(metadata.remove(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader()));
+ byte[] statusDetailBin = StreamUtils.decodeASCIIByte(raw);
+ ClassLoader tccl = Thread.currentThread().getContextClassLoader();
+ try {
+ final Status statusDetail = Status.parseFrom(statusDetailBin);
+ List<Any> detailList = statusDetail.getDetailsList();
+ Map<Class<?>, Object> classObjectMap =
tranFromStatusDetails(detailList);
+
+ // get common exception from DebugInfo
+ TriRpcStatus status =
TriRpcStatus.fromCode(statusDetail.getCode())
+
.withDescription(TriRpcStatus.decodeMessage(statusDetail.getMessage()));
+ DebugInfo debugInfo = (DebugInfo)
classObjectMap.get(DebugInfo.class);
+ if (debugInfo != null) {
+ String msg = ExceptionUtils.getStackFrameString(
+ debugInfo.getStackEntriesList());
+ status = status.appendDescription(msg);
+ }
+ return status;
+ } catch (IOException ioException) {
+ return null;
+ } finally {
+ ClassLoadUtil.switchContextLoader(tccl);
+ }
+
+ }
+
+
+ private Map<Class<?>, Object> tranFromStatusDetails(List<Any>
detailList) {
+ Map<Class<?>, Object> map = new HashMap<>(detailList.size());
+ try {
+ for (Any any : detailList) {
+ if (any.is(ErrorInfo.class)) {
+ ErrorInfo errorInfo = any.unpack(ErrorInfo.class);
+ map.putIfAbsent(ErrorInfo.class, errorInfo);
+ } else if (any.is(DebugInfo.class)) {
+ DebugInfo debugInfo = any.unpack(DebugInfo.class);
+ map.putIfAbsent(DebugInfo.class, debugInfo);
+ }
+ // support others type but now only support this
+ }
+ } catch (Throwable t) {
+ LOGGER.error(PROTOCOL_FAILED_RESPONSE, "", "", "tran from
grpc-status-details error", t);
+ }
+ return map;
+ }
+
@Override
public void onHeader(Http2Headers headers, boolean endStream) {
executor.execute(() -> {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
index 0c0cfe5eb2..84e02e4661 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
@@ -182,6 +182,9 @@ public class TripleServerStream extends AbstractStream
implements ServerStream {
String grpcMessage = getGrpcMessage(rpcStatus);
grpcMessage =
TriRpcStatus.encodeMessage(TriRpcStatus.limitSizeTo1KB(grpcMessage));
headers.set(TripleHeaderEnum.MESSAGE_KEY.getHeader(), grpcMessage);
+ if (!getGrpcStatusDetailEnabled()) {
+ return headers;
+ }
Status.Builder builder =
Status.newBuilder().setCode(rpcStatus.code.code)
.setMessage(grpcMessage);
Throwable throwable = rpcStatus.cause;