This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 33af482  Format triple (#9058)
33af482 is described below

commit 33af482ce223999ebcc4cec0c04300767168829e
Author: GuoHao <[email protected]>
AuthorDate: Mon Oct 18 11:41:52 2021 +0800

    Format triple (#9058)
---
 .../rpc/protocol/tri/AbstractServerStream.java     | 34 +++++++-------
 .../dubbo/rpc/protocol/tri/AbstractStream.java     | 33 ++++++-------
 .../rpc/protocol/tri/CancelableStreamObserver.java |  4 +-
 .../dubbo/rpc/protocol/tri/ClientStream.java       | 54 +++++++++++-----------
 .../apache/dubbo/rpc/protocol/tri/Compressor.java  | 32 ++++++-------
 .../dubbo/rpc/protocol/tri/GracefulShutdown.java   | 10 ++--
 .../dubbo/rpc/protocol/tri/GrpcDataDecoder.java    | 14 +++---
 .../apache/dubbo/rpc/protocol/tri/GrpcStatus.java  |  5 +-
 .../rpc/protocol/tri/SingleProtobufUtils.java      | 14 +++---
 .../dubbo/rpc/protocol/tri/TransportObserver.java  | 15 +++---
 .../dubbo/rpc/protocol/tri/TripleHeaderEnum.java   | 12 ++---
 .../tri/TripleHttp2FrameServerHandler.java         |  2 +-
 .../rpc/protocol/tri/TripleHttp2Protocol.java      | 38 +++++++--------
 .../dubbo/rpc/protocol/tri/TripleInvoker.java      | 10 ++--
 .../protocol/tri/TripleServerInboundHandler.java   |  2 +-
 .../apache/dubbo/rpc/protocol/tri/TripleUtil.java  |  5 +-
 .../dubbo/rpc/protocol/tri/UnaryClientStream.java  |  6 +--
 .../dubbo/rpc/protocol/tri/UnaryServerStream.java  |  4 +-
 .../protocol/tri/service/TriBuiltinService.java    |  2 +-
 .../rpc/protocol/tri/service/TriHealthImpl.java    | 16 +++----
 20 files changed, 149 insertions(+), 163 deletions(-)

diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index 153c556..567f45a 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -47,7 +47,7 @@ public abstract class AbstractServerStream extends 
AbstractStream implements Str
     private final ProviderModel providerModel;
     private List<MethodDescriptor> methodDescriptors;
     private Invoker<?> invoker;
-    private List<HeaderFilter> headerFilters;
+    private final List<HeaderFilter> headerFilters;
 
     protected AbstractServerStream(URL url) {
         this(url, lookupProviderModel(url));
@@ -69,7 +69,7 @@ public abstract class AbstractServerStream extends 
AbstractStream implements Str
             return null;
         }
         return (ExecutorService) providerModel.getServiceMetadata()
-                .getAttribute(CommonConstants.THREADPOOL_KEY);
+            .getAttribute(CommonConstants.THREADPOOL_KEY);
     }
 
     public static UnaryServerStream unary(URL url) {
@@ -116,8 +116,8 @@ public abstract class AbstractServerStream extends 
AbstractStream implements Str
 
     protected RpcInvocation buildInvocation(Metadata metadata) {
         RpcInvocation inv = new RpcInvocation(getUrl().getServiceModel(),
-                getMethodName(), getServiceDescriptor().getServiceName(),
-                getUrl().getProtocolServiceKey(), 
getMethodDescriptor().getParameterClasses(), new Object[0]);
+            getMethodName(), getServiceDescriptor().getServiceName(),
+            getUrl().getProtocolServiceKey(), 
getMethodDescriptor().getParameterClasses(), new Object[0]);
         inv.setTargetServiceUniqueName(getUrl().getServiceKey());
         inv.setReturnTypes(getMethodDescriptor().getReturnTypes());
 
@@ -138,12 +138,12 @@ public abstract class AbstractServerStream extends 
AbstractStream implements Str
             }
             if (getMethodDescriptor() == null || 
getMethodDescriptor().isNeedWrap()) {
                 final TripleWrapper.TripleRequestWrapper wrapper = 
TripleUtil.unpack(data,
-                        TripleWrapper.TripleRequestWrapper.class);
+                    TripleWrapper.TripleRequestWrapper.class);
                 if 
(!getSerializeType().equals(TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType())))
 {
                     
transportError(GrpcStatus.fromCode(GrpcStatus.Code.INVALID_ARGUMENT)
-                            .withDescription("Received inconsistent 
serialization type from client, " +
-                                    "reject to deserialize! Expected:" + 
getSerializeType() +
-                                    " Actual:" + 
TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType())));
+                        .withDescription("Received inconsistent serialization 
type from client, " +
+                            "reject to deserialize! Expected:" + 
getSerializeType() +
+                            " Actual:" + 
TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType())));
                     return null;
                 }
                 if (getMethodDescriptor() == null) {
@@ -158,8 +158,8 @@ public abstract class AbstractServerStream extends 
AbstractStream implements Str
                     }
                     if (getMethodDescriptor() == null) {
                         
transportError(GrpcStatus.fromCode(GrpcStatus.Code.UNIMPLEMENTED)
-                                .withDescription("Method :" + getMethodName() 
+ "[" + Arrays.toString(paramTypes) + "] " +
-                                        "not found of service:" + 
getServiceDescriptor().getServiceName()));
+                            .withDescription("Method :" + getMethodName() + 
"[" + Arrays.toString(paramTypes) + "] " +
+                                "not found of service:" + 
getServiceDescriptor().getServiceName()));
 
                         return null;
                     }
@@ -181,7 +181,7 @@ public abstract class AbstractServerStream extends 
AbstractStream implements Str
     protected Metadata createRequestMeta() {
         Metadata metadata = new DefaultMetadata();
         metadata.putIfNotNull(TripleHeaderEnum.GRPC_ENCODING.getHeader(), 
super.getCompressor().getMessageEncoding())
-                
.putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(), 
Compressor.getAcceptEncoding(getUrl().getOrDefaultFrameworkModel()));
+            .putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(), 
Compressor.getAcceptEncoding(getUrl().getOrDefaultFrameworkModel()));
         return metadata;
     }
 
@@ -189,7 +189,7 @@ public abstract class AbstractServerStream extends 
AbstractStream implements Str
         final com.google.protobuf.Message message;
         if (getMethodDescriptor().isNeedWrap()) {
             message = TripleUtil.wrapResp(getUrl(), getSerializeType(), value, 
getMethodDescriptor(),
-                    getMultipleSerialization());
+                getMultipleSerialization());
         } else {
             message = (Message) value;
         }
@@ -206,19 +206,19 @@ public abstract class AbstractServerStream extends 
AbstractStream implements Str
                 } catch (Throwable t) {
                     LOGGER.error("Exception processing triple message", t);
                     
transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                            .withDescription("Exception in invoker chain :" + 
t.getMessage())
-                            .withCause(t));
+                        .withDescription("Exception in invoker chain :" + 
t.getMessage())
+                        .withCause(t));
                 }
             });
         } catch (RejectedExecutionException e) {
             LOGGER.error("Provider's thread pool is full", e);
             
transportError(GrpcStatus.fromCode(GrpcStatus.Code.RESOURCE_EXHAUSTED)
-                    .withDescription("Provider's thread pool is full"));
+                .withDescription("Provider's thread pool is full"));
         } catch (Throwable t) {
             LOGGER.error("Provider submit request to thread pool error ", t);
             transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                    .withCause(t)
-                    .withDescription("Provider's error"));
+                .withCause(t)
+                .withDescription("Provider's error"));
         }
     }
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index c6bdde6..1f4f19a 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -51,6 +51,7 @@ public abstract class AbstractStream implements Stream {
     private final StreamObserver<Object> streamObserver;
     private final TransportObserver transportObserver;
     private final Executor executor;
+    private final CancellationContext cancellationContext;
     private ServiceDescriptor serviceDescriptor;
     private MethodDescriptor methodDescriptor;
     private String methodName;
@@ -60,22 +61,12 @@ public abstract class AbstractStream implements Stream {
     private TransportObserver transportSubscriber;
     private Compressor compressor = IdentityCompressor.NONE;
     private Compressor deCompressor = IdentityCompressor.NONE;
-
-    private final CancellationContext cancellationContext;
     private volatile boolean cancelled = false;
 
-    public boolean isCancelled() {
-        return cancelled;
-    }
-
     protected AbstractStream(URL url) {
         this(url, null);
     }
 
-    protected CancellationContext getCancellationContext() {
-        return cancellationContext;
-    }
-
     protected AbstractStream(URL url, Executor executor) {
         this.url = url;
         final Executor sourceExecutor = lookupExecutor(url, executor);
@@ -88,6 +79,13 @@ public abstract class AbstractStream implements Stream {
         this.streamObserver = createStreamObserver();
     }
 
+    public boolean isCancelled() {
+        return cancelled;
+    }
+
+    protected CancellationContext getCancellationContext() {
+        return cancellationContext;
+    }
 
     private Executor lookupExecutor(URL url, Executor executor) {
         // only server maybe not null
@@ -205,6 +203,10 @@ public abstract class AbstractStream implements Stream {
         this.serviceDescriptor = serviceDescriptor;
     }
 
+    public Compressor getCompressor() {
+        return this.compressor;
+    }
+
     /**
      * set compressor if required
      *
@@ -225,6 +227,9 @@ public abstract class AbstractStream implements Stream {
         return this;
     }
 
+    public Compressor getDeCompressor() {
+        return this.deCompressor;
+    }
 
     protected AbstractStream setDeCompressor(Compressor compressor) {
         // If compressor is NULL, this will not be set.
@@ -241,14 +246,6 @@ public abstract class AbstractStream implements Stream {
         return this;
     }
 
-    public Compressor getCompressor() {
-        return this.compressor;
-    }
-
-    public Compressor getDeCompressor() {
-        return this.deCompressor;
-    }
-
     public URL getUrl() {
         return url;
     }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
index 379b002..7d107f9 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
@@ -27,10 +27,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public abstract class CancelableStreamObserver<T> implements StreamObserver<T> 
{
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CancelableStreamObserver.class);
-
-    private CancellationContext cancellationContext;
-
     private final AtomicBoolean contextSet = new AtomicBoolean(false);
+    private CancellationContext cancellationContext;
 
     public CancelableStreamObserver() {
     }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
index 215eaba..42e7089 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
@@ -33,6 +33,33 @@ public class ClientStream extends AbstractClientStream 
implements Stream {
         return new ClientStreamObserverImpl(getCancellationContext());
     }
 
+    @Override
+    protected TransportObserver createTransportObserver() {
+        return new AbstractTransportObserver() {
+
+            @Override
+            public void onData(byte[] data, boolean endStream) {
+                execute(() -> {
+                    final Object resp = deserializeResponse(data);
+                    getStreamSubscriber().onNext(resp);
+                });
+            }
+
+            @Override
+            public void onComplete() {
+                execute(() -> {
+                    final GrpcStatus status = 
extractStatusFromMeta(getHeaders());
+
+                    if (GrpcStatus.Code.isOk(status.code.code)) {
+                        getStreamSubscriber().onCompleted();
+                    } else {
+                        getStreamSubscriber().onError(status.asException());
+                    }
+                });
+            }
+        };
+    }
+
     private class ClientStreamObserverImpl extends 
CancelableStreamObserver<Object> implements ClientStreamObserver<Object> {
 
         private boolean metaSent;
@@ -73,31 +100,4 @@ public class ClientStream extends AbstractClientStream 
implements Stream {
             setCompressor(compressor);
         }
     }
-
-    @Override
-    protected TransportObserver createTransportObserver() {
-        return new AbstractTransportObserver() {
-
-            @Override
-            public void onData(byte[] data, boolean endStream) {
-                execute(() -> {
-                    final Object resp = deserializeResponse(data);
-                    getStreamSubscriber().onNext(resp);
-                });
-            }
-
-            @Override
-            public void onComplete() {
-                execute(() -> {
-                    final GrpcStatus status = 
extractStatusFromMeta(getHeaders());
-
-                    if (GrpcStatus.Code.isOk(status.code.code)) {
-                        getStreamSubscriber().onCompleted();
-                    } else {
-                        getStreamSubscriber().onError(status.asException());
-                    }
-                });
-            }
-        };
-    }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
index 776fb89..c6a2791 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
@@ -36,6 +36,21 @@ public interface Compressor {
 
     String DEFAULT_COMPRESSOR = "identity";
 
+    static Compressor getCompressor(FrameworkModel frameworkModel, String 
compressorStr) {
+        if (null == compressorStr) {
+            return null;
+        }
+        return 
frameworkModel.getExtensionLoader(Compressor.class).getExtension(compressorStr);
+    }
+
+    static String getAcceptEncoding(FrameworkModel frameworkModel) {
+        Set<String> supportedEncodingSet = 
frameworkModel.getExtensionLoader(Compressor.class).getSupportedExtensions();
+        if (supportedEncodingSet.isEmpty()) {
+            return null;
+        }
+        return String.join(",", supportedEncodingSet);
+    }
+
     /**
      * message encoding of current compressor
      *
@@ -58,21 +73,4 @@ public interface Compressor {
      * @return decompressed payload byte array
      */
     byte[] decompress(byte[] payloadByteArr);
-
-
-    static Compressor getCompressor(FrameworkModel frameworkModel, String 
compressorStr) {
-        if (null == compressorStr) {
-            return null;
-        }
-        return 
frameworkModel.getExtensionLoader(Compressor.class).getExtension(compressorStr);
-    }
-
-
-    static String getAcceptEncoding(FrameworkModel frameworkModel) {
-        Set<String> supportedEncodingSet = 
frameworkModel.getExtensionLoader(Compressor.class).getSupportedExtensions();
-        if (supportedEncodingSet.isEmpty()) {
-            return null;
-        }
-        return String.join(",", supportedEncodingSet);
-    }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GracefulShutdown.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GracefulShutdown.java
index 3743d75..f526969 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GracefulShutdown.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GracefulShutdown.java
@@ -45,13 +45,13 @@ public class GracefulShutdown {
 
     public void gracefulShutdown() {
         Http2GoAwayFrame goAwayFrame = new 
DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR, ByteBufUtil
-                .writeAscii(ctx.alloc(), goAwayMessage));
+            .writeAscii(ctx.alloc(), goAwayMessage));
         goAwayFrame.setExtraStreamIds(Integer.MAX_VALUE);
         ctx.write(goAwayFrame);
         pingFuture = ctx.executor().schedule(
-                () -> secondGoAwayAndClose(ctx),
-                GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
-                TimeUnit.NANOSECONDS);
+            () -> secondGoAwayAndClose(ctx),
+            GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
+            TimeUnit.NANOSECONDS);
 
         Http2PingFrame pingFrame = new 
DefaultHttp2PingFrame(GRACEFUL_SHUTDOWN_PING, false);
         ctx.write(pingFrame);
@@ -67,7 +67,7 @@ public class GracefulShutdown {
 
         try {
             Http2GoAwayFrame goAwayFrame = new 
DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR,
-                    ByteBufUtil.writeAscii(this.ctx.alloc(), 
this.goAwayMessage));
+                ByteBufUtil.writeAscii(this.ctx.alloc(), this.goAwayMessage));
             ctx.write(goAwayFrame);
             ctx.flush();
             //TODO support customize graceful shutdown timeout mills
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java
index 59122fe..4e1e5c3 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java
@@ -30,10 +30,9 @@ public class GrpcDataDecoder extends 
ReplayingDecoder<GrpcDataDecoder.GrpcDecode
     private static final int RESERVED_MASK = 0xFE;
     private static final int COMPRESSED_FLAG_MASK = 1;
     private final int maxDataSize;
-
+    private final boolean client;
     private int len;
     private boolean compressedFlag;
-    private final boolean client;
 
     public GrpcDataDecoder(int maxDataSize, boolean client) {
         super(GrpcDecodeState.HEADER);
@@ -93,12 +92,6 @@ public class GrpcDataDecoder extends 
ReplayingDecoder<GrpcDataDecoder.GrpcDecode
         return compressor.decompress(data);
     }
 
-    enum GrpcDecodeState {
-        HEADER,
-        PAYLOAD
-    }
-
-
     private Compressor getDeCompressor(ChannelHandlerContext ctx, boolean 
client) {
         AbstractStream stream = client ? getClientStream(ctx) : 
getServerStream(ctx);
         return stream.getDeCompressor();
@@ -112,5 +105,10 @@ public class GrpcDataDecoder extends 
ReplayingDecoder<GrpcDataDecoder.GrpcDecode
         return ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
     }
 
+    enum GrpcDecodeState {
+        HEADER,
+        PAYLOAD
+    }
+
 
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
index 0df9771..c21ec32 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
@@ -16,12 +16,13 @@
  */
 package org.apache.dubbo.rpc.protocol.tri;
 
-import io.netty.handler.codec.http.QueryStringDecoder;
-import io.netty.handler.codec.http.QueryStringEncoder;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.remoting.exchange.Response;
 import org.apache.dubbo.rpc.RpcException;
 
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.codec.http.QueryStringEncoder;
+
 import static org.apache.dubbo.rpc.RpcException.FORBIDDEN_EXCEPTION;
 import static org.apache.dubbo.rpc.RpcException.LIMIT_EXCEEDED_EXCEPTION;
 import static org.apache.dubbo.rpc.RpcException.METHOD_NOT_FOUND;
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/SingleProtobufUtils.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/SingleProtobufUtils.java
index f9e94ea..aca2b09 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/SingleProtobufUtils.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/SingleProtobufUtils.java
@@ -45,13 +45,6 @@ public class SingleProtobufUtils {
         ExtensionRegistryLite.getEmptyRegistry();
     private static final ConcurrentMap<Class<?>, SingleMessageMarshaller<?>> 
marshallers = new ConcurrentHashMap<>();
 
-    static boolean isSupported(Class<?> clazz) {
-        if (clazz == null) {
-            return false;
-        }
-        return MessageLite.class.isAssignableFrom(clazz);
-    }
-
     static {
         // Built-in types need to be registered in advance
         marshaller(Empty.getDefaultInstance());
@@ -66,6 +59,13 @@ public class SingleProtobufUtils {
         marshaller(ListValue.getDefaultInstance());
     }
 
+    static boolean isSupported(Class<?> clazz) {
+        if (clazz == null) {
+            return false;
+        }
+        return MessageLite.class.isAssignableFrom(clazz);
+    }
+
     public static <T extends MessageLite> void marshaller(T defaultInstance) {
         marshallers.put(defaultInstance.getClass(), new 
SingleMessageMarshaller<>(defaultInstance));
     }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
index 01415a5..015a87e 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
@@ -21,6 +21,13 @@ import io.netty.handler.codec.http2.Http2Error;
 
 public interface TransportObserver {
 
+    static int calcCompressFlag(Compressor compressor) {
+        if (null == compressor || IdentityCompressor.NONE.equals(compressor)) {
+            return 0;
+        }
+        return 1;
+    }
+
     void onMetadata(Metadata metadata, boolean endStream);
 
     void onData(byte[] data, boolean endStream);
@@ -31,12 +38,4 @@ public interface TransportObserver {
     default void onComplete() {
     }
 
-
-    static int calcCompressFlag(Compressor compressor) {
-        if (null == compressor || IdentityCompressor.NONE.equals(compressor)) {
-            return 0;
-        }
-        return 1;
-    }
-
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
index 0d94ac7..e8199b0 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
@@ -61,6 +61,12 @@ public enum TripleHeaderEnum {
         excludeAttachmentsSet.add(TripleConstant.TE_KEY);
     }
 
+    private final String header;
+
+    TripleHeaderEnum(String header) {
+        this.header = header;
+    }
+
     public static TripleHeaderEnum getEnum(String header) {
         return enumMap.get(header);
     }
@@ -73,12 +79,6 @@ public enum TripleHeaderEnum {
         return excludeAttachmentsSet.contains(key) || enumMap.containsKey(key);
     }
 
-    private final String header;
-
-    TripleHeaderEnum(String header) {
-        this.header = header;
-    }
-
     public String getHeader() {
         return header;
     }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index 1fea532..c3e783a 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -52,7 +52,7 @@ import static 
org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
 public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TripleHttp2FrameServerHandler.class);
     private final PathResolver PATH_RESOLVER;
-    private FrameworkModel frameworkModel;
+    private final FrameworkModel frameworkModel;
 
     public TripleHttp2FrameServerHandler(FrameworkModel frameworkModel) {
         this.frameworkModel = frameworkModel;
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index e815521..8c301dc 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -63,15 +63,15 @@ public class TripleHttp2Protocol extends Http2WireProtocol 
implements ScopeModel
     public void configServerPipeline(URL url, ChannelPipeline pipeline, 
SslContext sslContext) {
         final Configuration config = 
ConfigurationUtils.getGlobalConfiguration(applicationModel);
         final Http2FrameCodec codec = Http2FrameCodecBuilder.forServer()
-                .gracefulShutdownTimeoutMillis(10000)
-                .initialSettings(new Http2Settings()
-                        
.headerTableSize(config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, 4096))
-                        
.maxConcurrentStreams(config.getInt(H2_SETTINGS_MAX_CONCURRENT_STREAMS_KEY, 
Integer.MAX_VALUE))
-                        
.initialWindowSize(config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, 1 << 20))
-                        
.maxFrameSize(config.getInt(H2_SETTINGS_MAX_FRAME_SIZE_KEY, 2 << 14))
-                        
.maxHeaderListSize(config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, 8192)))
-                .frameLogger(SERVER_LOGGER)
-                .build();
+            .gracefulShutdownTimeoutMillis(10000)
+            .initialSettings(new Http2Settings()
+                
.headerTableSize(config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, 4096))
+                
.maxConcurrentStreams(config.getInt(H2_SETTINGS_MAX_CONCURRENT_STREAMS_KEY, 
Integer.MAX_VALUE))
+                
.initialWindowSize(config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, 1 << 20))
+                .maxFrameSize(config.getInt(H2_SETTINGS_MAX_FRAME_SIZE_KEY, 2 
<< 14))
+                
.maxHeaderListSize(config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, 8192)))
+            .frameLogger(SERVER_LOGGER)
+            .build();
         final Http2MultiplexHandler handler = new Http2MultiplexHandler(new 
TripleServerInitializer(frameworkModel));
         pipeline.addLast(codec, new TripleServerConnectionHandler(), handler);
     }
@@ -80,16 +80,16 @@ public class TripleHttp2Protocol extends Http2WireProtocol 
implements ScopeModel
     public void configClientPipeline(URL url, ChannelPipeline pipeline, 
SslContext sslContext) {
         final Configuration config = 
ConfigurationUtils.getGlobalConfiguration(applicationModel);
         final Http2FrameCodec codec = Http2FrameCodecBuilder.forClient()
-                .gracefulShutdownTimeoutMillis(10000)
-                .initialSettings(new Http2Settings()
-                        
.headerTableSize(config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, 4096))
-                        
.pushEnabled(config.getBoolean(H2_SETTINGS_ENABLE_PUSH_KEY, false))
-                        
.maxConcurrentStreams(config.getInt(H2_SETTINGS_MAX_CONCURRENT_STREAMS_KEY, 
Integer.MAX_VALUE))
-                        
.initialWindowSize(config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, 1 << 20))
-                        
.maxFrameSize(config.getInt(H2_SETTINGS_MAX_FRAME_SIZE_KEY, 2 << 14))
-                        
.maxHeaderListSize(config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, 8192)))
-                .frameLogger(CLIENT_LOGGER)
-                .build();
+            .gracefulShutdownTimeoutMillis(10000)
+            .initialSettings(new Http2Settings()
+                
.headerTableSize(config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, 4096))
+                .pushEnabled(config.getBoolean(H2_SETTINGS_ENABLE_PUSH_KEY, 
false))
+                
.maxConcurrentStreams(config.getInt(H2_SETTINGS_MAX_CONCURRENT_STREAMS_KEY, 
Integer.MAX_VALUE))
+                
.initialWindowSize(config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, 1 << 20))
+                .maxFrameSize(config.getInt(H2_SETTINGS_MAX_FRAME_SIZE_KEY, 2 
<< 14))
+                
.maxHeaderListSize(config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, 8192)))
+            .frameLogger(CLIENT_LOGGER)
+            .build();
         final Http2MultiplexHandler handler = new Http2MultiplexHandler(new 
TripleClientHandler(frameworkModel));
         pipeline.addLast(codec, handler, new 
TripleClientRequestHandler(frameworkModel));
     }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index 2011a04..a57f356 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -83,7 +83,7 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
         
inv.setServiceModel(RpcContext.getServiceContext().getConsumerUrl().getServiceModel());
         inv.setAttachment(PATH_KEY, getUrl().getPath());
         inv.setAttachment(Constants.SERIALIZATION_KEY,
-                getUrl().getParameter(Constants.SERIALIZATION_KEY, 
Constants.DEFAULT_REMOTING_SERIALIZATION));
+            getUrl().getParameter(Constants.SERIALIZATION_KEY, 
Constants.DEFAULT_REMOTING_SERIALIZATION));
         try {
             int timeout = calculateTimeout(invocation, methodName);
             invocation.put(TIMEOUT_KEY, timeout);
@@ -126,12 +126,12 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
             return result;
         } catch (TimeoutException e) {
             throw new RpcException(RpcException.TIMEOUT_EXCEPTION,
-                    "Invoke remote method timeout. method: " + 
invocation.getMethodName() + ", provider: " + getUrl()
-                            + ", cause: " + e.getMessage(), e);
+                "Invoke remote method timeout. method: " + 
invocation.getMethodName() + ", provider: " + getUrl()
+                    + ", cause: " + e.getMessage(), e);
         } catch (RemotingException e) {
             throw new RpcException(RpcException.NETWORK_EXCEPTION,
-                    "Failed to invoke remote method: " + 
invocation.getMethodName() + ", provider: " + getUrl()
-                            + ", cause: " + e.getMessage(), e);
+                "Failed to invoke remote method: " + 
invocation.getMethodName() + ", provider: " + getUrl()
+                    + ", cause: " + e.getMessage(), e);
         }
     }
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
index 2d16e8e..6728b09 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
@@ -26,7 +26,7 @@ public class TripleServerInboundHandler extends 
ChannelInboundHandlerAdapter {
         final byte[] data = (byte[]) msg;
         if (serverStream != null) {
             serverStream.asTransportObserver()
-                    .onData(data, false);
+                .onData(data, false);
         }
     }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
index c352b7a..f1f268c 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
@@ -70,10 +70,7 @@ public class TripleUtil {
         if (QUIET_EXCEPTIONS_CLASS.contains(t.getClass())) {
             return true;
         }
-        if (QUIET_EXCEPTIONS.contains(t.getClass().getSimpleName())) {
-            return true;
-        }
-        return false;
+        return QUIET_EXCEPTIONS.contains(t.getClass().getSimpleName());
     }
 
     /**
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
index 452b514..e90f29e 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
@@ -68,8 +68,8 @@ public class UnaryClientStream extends AbstractClientStream 
implements Stream {
                     DefaultFuture2.received(getConnection(), response);
                 } catch (Exception e) {
                     final GrpcStatus status = 
GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
-                            .withCause(e)
-                            .withDescription("Failed to deserialize response");
+                        .withCause(e)
+                        .withDescription("Failed to deserialize response");
                     onError(status);
                 }
             });
@@ -114,7 +114,7 @@ public class UnaryClientStream extends AbstractClientStream 
implements Stream {
                 DebugInfo debugInfo = (DebugInfo) 
classObjectMap.get(DebugInfo.class);
                 if (debugInfo == null) {
                     return new RpcException(statusDetail.getCode(),
-                            statusDetail.getMessage());
+                        statusDetail.getMessage());
                 }
                 String msg = 
ExceptionUtils.getStackFrameString(debugInfo.getStackEntriesList());
                 return new RpcException(statusDetail.getCode(), msg);
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
index b5ec80e..fde24a0 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
@@ -105,7 +105,7 @@ public class UnaryServerStream extends AbstractServerStream 
implements Stream {
                             
transportError(rpcExceptionCodeToGrpc(((RpcException) exception).getCode())
                                 .withCause(exception), 
response.getObjectAttachments());
                             final GrpcStatus status = 
rpcExceptionCodeToGrpc(((RpcException) exception).getCode())
-                                    .withCause(exception);
+                                .withCause(exception);
                             transportError(status, 
response.getObjectAttachments());
                         } else {
                             
transportError(GrpcStatus.fromCode(GrpcStatus.Code.UNKNOWN)
@@ -139,7 +139,7 @@ public class UnaryServerStream extends AbstractServerStream 
implements Stream {
                     LOGGER.warn("Exception processing triple message", e);
                     if (e instanceof RpcException) {
                         final GrpcStatus status = 
rpcExceptionCodeToGrpc(((RpcException) e).getCode())
-                                .withCause(e);
+                            .withCause(e);
                         transportError(status, 
response.getObjectAttachments());
                     } else {
                         
transportError(GrpcStatus.fromCode(GrpcStatus.Code.UNKNOWN)
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriBuiltinService.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriBuiltinService.java
index fff08c7..bc7d744 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriBuiltinService.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriBuiltinService.java
@@ -90,7 +90,7 @@ public class TriBuiltinService {
             Invoker<?> invoker = proxyFactory.getInvoker(healthService, 
Health.class, url);
             pathResolver.add(url.getServiceKey(), invoker);
             pathResolver.add(url.getServiceInterface(), invoker);
-            providerModel.setDestroyCaller(()->{
+            providerModel.setDestroyCaller(() -> {
                 pathResolver.remove(url.getServiceKey());
                 pathResolver.remove(url.getServiceInterface());
                 return null;
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriHealthImpl.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriHealthImpl.java
index 3c9c975..59bda55 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriHealthImpl.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriHealthImpl.java
@@ -43,10 +43,6 @@ public class TriHealthImpl implements Health {
     private final Map<String, HealthCheckResponse.ServingStatus> statusMap = 
new ConcurrentHashMap<>();
 
     private final Object watchLock = new Object();
-
-    // Indicates if future status changes should be ignored.
-    private boolean terminal;
-
     // Technically a Multimap<String, StreamObserver<HealthCheckResponse>>.  
The Boolean value is not
     // used.  The StreamObservers need to be kept in a identity-equality set, 
to make sure
     // user-defined equals() doesn't confuse our book-keeping of the 
StreamObservers.  Constructing
@@ -54,12 +50,19 @@ public class TriHealthImpl implements Health {
     // would rather not have the Guava collections dependency.
     private final HashMap<String, 
IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean>>
         watchers = new HashMap<>();
+    // Indicates if future status changes should be ignored.
+    private boolean terminal;
 
     public TriHealthImpl() {
         // Copy of what Go and C++ do.
         statusMap.put(HealthStatusManager.SERVICE_NAME_ALL_SERVICES, 
HealthCheckResponse.ServingStatus.SERVING);
     }
 
+    private static HealthCheckResponse 
getResponseForWatch(HealthCheckResponse.ServingStatus recordedStatus) {
+        return HealthCheckResponse.newBuilder().setStatus(
+            recordedStatus == null ? 
HealthCheckResponse.ServingStatus.SERVICE_UNKNOWN : recordedStatus).build();
+    }
+
     @Override
     public HealthCheckResponse check(HealthCheckRequest request) {
         HealthCheckResponse.ServingStatus status = 
statusMap.get(request.getService());
@@ -151,9 +154,4 @@ public class TriHealthImpl implements Health {
             }
         }
     }
-
-    private static HealthCheckResponse 
getResponseForWatch(HealthCheckResponse.ServingStatus recordedStatus) {
-        return HealthCheckResponse.newBuilder().setStatus(
-            recordedStatus == null ? 
HealthCheckResponse.ServingStatus.SERVICE_UNKNOWN : recordedStatus).build();
-    }
 }

Reply via email to