This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 33af482 Format triple (#9058)
33af482 is described below
commit 33af482ce223999ebcc4cec0c04300767168829e
Author: GuoHao <[email protected]>
AuthorDate: Mon Oct 18 11:41:52 2021 +0800
Format triple (#9058)
---
.../rpc/protocol/tri/AbstractServerStream.java | 34 +++++++-------
.../dubbo/rpc/protocol/tri/AbstractStream.java | 33 ++++++-------
.../rpc/protocol/tri/CancelableStreamObserver.java | 4 +-
.../dubbo/rpc/protocol/tri/ClientStream.java | 54 +++++++++++-----------
.../apache/dubbo/rpc/protocol/tri/Compressor.java | 32 ++++++-------
.../dubbo/rpc/protocol/tri/GracefulShutdown.java | 10 ++--
.../dubbo/rpc/protocol/tri/GrpcDataDecoder.java | 14 +++---
.../apache/dubbo/rpc/protocol/tri/GrpcStatus.java | 5 +-
.../rpc/protocol/tri/SingleProtobufUtils.java | 14 +++---
.../dubbo/rpc/protocol/tri/TransportObserver.java | 15 +++---
.../dubbo/rpc/protocol/tri/TripleHeaderEnum.java | 12 ++---
.../tri/TripleHttp2FrameServerHandler.java | 2 +-
.../rpc/protocol/tri/TripleHttp2Protocol.java | 38 +++++++--------
.../dubbo/rpc/protocol/tri/TripleInvoker.java | 10 ++--
.../protocol/tri/TripleServerInboundHandler.java | 2 +-
.../apache/dubbo/rpc/protocol/tri/TripleUtil.java | 5 +-
.../dubbo/rpc/protocol/tri/UnaryClientStream.java | 6 +--
.../dubbo/rpc/protocol/tri/UnaryServerStream.java | 4 +-
.../protocol/tri/service/TriBuiltinService.java | 2 +-
.../rpc/protocol/tri/service/TriHealthImpl.java | 16 +++----
20 files changed, 149 insertions(+), 163 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index 153c556..567f45a 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -47,7 +47,7 @@ public abstract class AbstractServerStream extends
AbstractStream implements Str
private final ProviderModel providerModel;
private List<MethodDescriptor> methodDescriptors;
private Invoker<?> invoker;
- private List<HeaderFilter> headerFilters;
+ private final List<HeaderFilter> headerFilters;
protected AbstractServerStream(URL url) {
this(url, lookupProviderModel(url));
@@ -69,7 +69,7 @@ public abstract class AbstractServerStream extends
AbstractStream implements Str
return null;
}
return (ExecutorService) providerModel.getServiceMetadata()
- .getAttribute(CommonConstants.THREADPOOL_KEY);
+ .getAttribute(CommonConstants.THREADPOOL_KEY);
}
public static UnaryServerStream unary(URL url) {
@@ -116,8 +116,8 @@ public abstract class AbstractServerStream extends
AbstractStream implements Str
protected RpcInvocation buildInvocation(Metadata metadata) {
RpcInvocation inv = new RpcInvocation(getUrl().getServiceModel(),
- getMethodName(), getServiceDescriptor().getServiceName(),
- getUrl().getProtocolServiceKey(),
getMethodDescriptor().getParameterClasses(), new Object[0]);
+ getMethodName(), getServiceDescriptor().getServiceName(),
+ getUrl().getProtocolServiceKey(),
getMethodDescriptor().getParameterClasses(), new Object[0]);
inv.setTargetServiceUniqueName(getUrl().getServiceKey());
inv.setReturnTypes(getMethodDescriptor().getReturnTypes());
@@ -138,12 +138,12 @@ public abstract class AbstractServerStream extends
AbstractStream implements Str
}
if (getMethodDescriptor() == null ||
getMethodDescriptor().isNeedWrap()) {
final TripleWrapper.TripleRequestWrapper wrapper =
TripleUtil.unpack(data,
- TripleWrapper.TripleRequestWrapper.class);
+ TripleWrapper.TripleRequestWrapper.class);
if
(!getSerializeType().equals(TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType())))
{
transportError(GrpcStatus.fromCode(GrpcStatus.Code.INVALID_ARGUMENT)
- .withDescription("Received inconsistent
serialization type from client, " +
- "reject to deserialize! Expected:" +
getSerializeType() +
- " Actual:" +
TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType())));
+ .withDescription("Received inconsistent serialization
type from client, " +
+ "reject to deserialize! Expected:" +
getSerializeType() +
+ " Actual:" +
TripleUtil.convertHessianFromWrapper(wrapper.getSerializeType())));
return null;
}
if (getMethodDescriptor() == null) {
@@ -158,8 +158,8 @@ public abstract class AbstractServerStream extends
AbstractStream implements Str
}
if (getMethodDescriptor() == null) {
transportError(GrpcStatus.fromCode(GrpcStatus.Code.UNIMPLEMENTED)
- .withDescription("Method :" + getMethodName()
+ "[" + Arrays.toString(paramTypes) + "] " +
- "not found of service:" +
getServiceDescriptor().getServiceName()));
+ .withDescription("Method :" + getMethodName() +
"[" + Arrays.toString(paramTypes) + "] " +
+ "not found of service:" +
getServiceDescriptor().getServiceName()));
return null;
}
@@ -181,7 +181,7 @@ public abstract class AbstractServerStream extends
AbstractStream implements Str
protected Metadata createRequestMeta() {
Metadata metadata = new DefaultMetadata();
metadata.putIfNotNull(TripleHeaderEnum.GRPC_ENCODING.getHeader(),
super.getCompressor().getMessageEncoding())
-
.putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(),
Compressor.getAcceptEncoding(getUrl().getOrDefaultFrameworkModel()));
+ .putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(),
Compressor.getAcceptEncoding(getUrl().getOrDefaultFrameworkModel()));
return metadata;
}
@@ -189,7 +189,7 @@ public abstract class AbstractServerStream extends
AbstractStream implements Str
final com.google.protobuf.Message message;
if (getMethodDescriptor().isNeedWrap()) {
message = TripleUtil.wrapResp(getUrl(), getSerializeType(), value,
getMethodDescriptor(),
- getMultipleSerialization());
+ getMultipleSerialization());
} else {
message = (Message) value;
}
@@ -206,19 +206,19 @@ public abstract class AbstractServerStream extends
AbstractStream implements Str
} catch (Throwable t) {
LOGGER.error("Exception processing triple message", t);
transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withDescription("Exception in invoker chain :" +
t.getMessage())
- .withCause(t));
+ .withDescription("Exception in invoker chain :" +
t.getMessage())
+ .withCause(t));
}
});
} catch (RejectedExecutionException e) {
LOGGER.error("Provider's thread pool is full", e);
transportError(GrpcStatus.fromCode(GrpcStatus.Code.RESOURCE_EXHAUSTED)
- .withDescription("Provider's thread pool is full"));
+ .withDescription("Provider's thread pool is full"));
} catch (Throwable t) {
LOGGER.error("Provider submit request to thread pool error ", t);
transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withCause(t)
- .withDescription("Provider's error"));
+ .withCause(t)
+ .withDescription("Provider's error"));
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index c6bdde6..1f4f19a 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -51,6 +51,7 @@ public abstract class AbstractStream implements Stream {
private final StreamObserver<Object> streamObserver;
private final TransportObserver transportObserver;
private final Executor executor;
+ private final CancellationContext cancellationContext;
private ServiceDescriptor serviceDescriptor;
private MethodDescriptor methodDescriptor;
private String methodName;
@@ -60,22 +61,12 @@ public abstract class AbstractStream implements Stream {
private TransportObserver transportSubscriber;
private Compressor compressor = IdentityCompressor.NONE;
private Compressor deCompressor = IdentityCompressor.NONE;
-
- private final CancellationContext cancellationContext;
private volatile boolean cancelled = false;
- public boolean isCancelled() {
- return cancelled;
- }
-
protected AbstractStream(URL url) {
this(url, null);
}
- protected CancellationContext getCancellationContext() {
- return cancellationContext;
- }
-
protected AbstractStream(URL url, Executor executor) {
this.url = url;
final Executor sourceExecutor = lookupExecutor(url, executor);
@@ -88,6 +79,13 @@ public abstract class AbstractStream implements Stream {
this.streamObserver = createStreamObserver();
}
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
+ protected CancellationContext getCancellationContext() {
+ return cancellationContext;
+ }
private Executor lookupExecutor(URL url, Executor executor) {
// only server maybe not null
@@ -205,6 +203,10 @@ public abstract class AbstractStream implements Stream {
this.serviceDescriptor = serviceDescriptor;
}
+ public Compressor getCompressor() {
+ return this.compressor;
+ }
+
/**
* set compressor if required
*
@@ -225,6 +227,9 @@ public abstract class AbstractStream implements Stream {
return this;
}
+ public Compressor getDeCompressor() {
+ return this.deCompressor;
+ }
protected AbstractStream setDeCompressor(Compressor compressor) {
// If compressor is NULL, this will not be set.
@@ -241,14 +246,6 @@ public abstract class AbstractStream implements Stream {
return this;
}
- public Compressor getCompressor() {
- return this.compressor;
- }
-
- public Compressor getDeCompressor() {
- return this.deCompressor;
- }
-
public URL getUrl() {
return url;
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
index 379b002..7d107f9 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/CancelableStreamObserver.java
@@ -27,10 +27,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
public abstract class CancelableStreamObserver<T> implements StreamObserver<T>
{
private static final Logger LOGGER =
LoggerFactory.getLogger(CancelableStreamObserver.class);
-
- private CancellationContext cancellationContext;
-
private final AtomicBoolean contextSet = new AtomicBoolean(false);
+ private CancellationContext cancellationContext;
public CancelableStreamObserver() {
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
index 215eaba..42e7089 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStream.java
@@ -33,6 +33,33 @@ public class ClientStream extends AbstractClientStream
implements Stream {
return new ClientStreamObserverImpl(getCancellationContext());
}
+ @Override
+ protected TransportObserver createTransportObserver() {
+ return new AbstractTransportObserver() {
+
+ @Override
+ public void onData(byte[] data, boolean endStream) {
+ execute(() -> {
+ final Object resp = deserializeResponse(data);
+ getStreamSubscriber().onNext(resp);
+ });
+ }
+
+ @Override
+ public void onComplete() {
+ execute(() -> {
+ final GrpcStatus status =
extractStatusFromMeta(getHeaders());
+
+ if (GrpcStatus.Code.isOk(status.code.code)) {
+ getStreamSubscriber().onCompleted();
+ } else {
+ getStreamSubscriber().onError(status.asException());
+ }
+ });
+ }
+ };
+ }
+
private class ClientStreamObserverImpl extends
CancelableStreamObserver<Object> implements ClientStreamObserver<Object> {
private boolean metaSent;
@@ -73,31 +100,4 @@ public class ClientStream extends AbstractClientStream
implements Stream {
setCompressor(compressor);
}
}
-
- @Override
- protected TransportObserver createTransportObserver() {
- return new AbstractTransportObserver() {
-
- @Override
- public void onData(byte[] data, boolean endStream) {
- execute(() -> {
- final Object resp = deserializeResponse(data);
- getStreamSubscriber().onNext(resp);
- });
- }
-
- @Override
- public void onComplete() {
- execute(() -> {
- final GrpcStatus status =
extractStatusFromMeta(getHeaders());
-
- if (GrpcStatus.Code.isOk(status.code.code)) {
- getStreamSubscriber().onCompleted();
- } else {
- getStreamSubscriber().onError(status.asException());
- }
- });
- }
- };
- }
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
index 776fb89..c6a2791 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Compressor.java
@@ -36,6 +36,21 @@ public interface Compressor {
String DEFAULT_COMPRESSOR = "identity";
+ static Compressor getCompressor(FrameworkModel frameworkModel, String
compressorStr) {
+ if (null == compressorStr) {
+ return null;
+ }
+ return
frameworkModel.getExtensionLoader(Compressor.class).getExtension(compressorStr);
+ }
+
+ static String getAcceptEncoding(FrameworkModel frameworkModel) {
+ Set<String> supportedEncodingSet =
frameworkModel.getExtensionLoader(Compressor.class).getSupportedExtensions();
+ if (supportedEncodingSet.isEmpty()) {
+ return null;
+ }
+ return String.join(",", supportedEncodingSet);
+ }
+
/**
* message encoding of current compressor
*
@@ -58,21 +73,4 @@ public interface Compressor {
* @return decompressed payload byte array
*/
byte[] decompress(byte[] payloadByteArr);
-
-
- static Compressor getCompressor(FrameworkModel frameworkModel, String
compressorStr) {
- if (null == compressorStr) {
- return null;
- }
- return
frameworkModel.getExtensionLoader(Compressor.class).getExtension(compressorStr);
- }
-
-
- static String getAcceptEncoding(FrameworkModel frameworkModel) {
- Set<String> supportedEncodingSet =
frameworkModel.getExtensionLoader(Compressor.class).getSupportedExtensions();
- if (supportedEncodingSet.isEmpty()) {
- return null;
- }
- return String.join(",", supportedEncodingSet);
- }
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GracefulShutdown.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GracefulShutdown.java
index 3743d75..f526969 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GracefulShutdown.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GracefulShutdown.java
@@ -45,13 +45,13 @@ public class GracefulShutdown {
public void gracefulShutdown() {
Http2GoAwayFrame goAwayFrame = new
DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR, ByteBufUtil
- .writeAscii(ctx.alloc(), goAwayMessage));
+ .writeAscii(ctx.alloc(), goAwayMessage));
goAwayFrame.setExtraStreamIds(Integer.MAX_VALUE);
ctx.write(goAwayFrame);
pingFuture = ctx.executor().schedule(
- () -> secondGoAwayAndClose(ctx),
- GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
- TimeUnit.NANOSECONDS);
+ () -> secondGoAwayAndClose(ctx),
+ GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
+ TimeUnit.NANOSECONDS);
Http2PingFrame pingFrame = new
DefaultHttp2PingFrame(GRACEFUL_SHUTDOWN_PING, false);
ctx.write(pingFrame);
@@ -67,7 +67,7 @@ public class GracefulShutdown {
try {
Http2GoAwayFrame goAwayFrame = new
DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR,
- ByteBufUtil.writeAscii(this.ctx.alloc(),
this.goAwayMessage));
+ ByteBufUtil.writeAscii(this.ctx.alloc(), this.goAwayMessage));
ctx.write(goAwayFrame);
ctx.flush();
//TODO support customize graceful shutdown timeout mills
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java
index 59122fe..4e1e5c3 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcDataDecoder.java
@@ -30,10 +30,9 @@ public class GrpcDataDecoder extends
ReplayingDecoder<GrpcDataDecoder.GrpcDecode
private static final int RESERVED_MASK = 0xFE;
private static final int COMPRESSED_FLAG_MASK = 1;
private final int maxDataSize;
-
+ private final boolean client;
private int len;
private boolean compressedFlag;
- private final boolean client;
public GrpcDataDecoder(int maxDataSize, boolean client) {
super(GrpcDecodeState.HEADER);
@@ -93,12 +92,6 @@ public class GrpcDataDecoder extends
ReplayingDecoder<GrpcDataDecoder.GrpcDecode
return compressor.decompress(data);
}
- enum GrpcDecodeState {
- HEADER,
- PAYLOAD
- }
-
-
private Compressor getDeCompressor(ChannelHandlerContext ctx, boolean
client) {
AbstractStream stream = client ? getClientStream(ctx) :
getServerStream(ctx);
return stream.getDeCompressor();
@@ -112,5 +105,10 @@ public class GrpcDataDecoder extends
ReplayingDecoder<GrpcDataDecoder.GrpcDecode
return ctx.channel().attr(TripleConstant.SERVER_STREAM_KEY).get();
}
+ enum GrpcDecodeState {
+ HEADER,
+ PAYLOAD
+ }
+
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
index 0df9771..c21ec32 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/GrpcStatus.java
@@ -16,12 +16,13 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
-import io.netty.handler.codec.http.QueryStringDecoder;
-import io.netty.handler.codec.http.QueryStringEncoder;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.rpc.RpcException;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.codec.http.QueryStringEncoder;
+
import static org.apache.dubbo.rpc.RpcException.FORBIDDEN_EXCEPTION;
import static org.apache.dubbo.rpc.RpcException.LIMIT_EXCEEDED_EXCEPTION;
import static org.apache.dubbo.rpc.RpcException.METHOD_NOT_FOUND;
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/SingleProtobufUtils.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/SingleProtobufUtils.java
index f9e94ea..aca2b09 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/SingleProtobufUtils.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/SingleProtobufUtils.java
@@ -45,13 +45,6 @@ public class SingleProtobufUtils {
ExtensionRegistryLite.getEmptyRegistry();
private static final ConcurrentMap<Class<?>, SingleMessageMarshaller<?>>
marshallers = new ConcurrentHashMap<>();
- static boolean isSupported(Class<?> clazz) {
- if (clazz == null) {
- return false;
- }
- return MessageLite.class.isAssignableFrom(clazz);
- }
-
static {
// Built-in types need to be registered in advance
marshaller(Empty.getDefaultInstance());
@@ -66,6 +59,13 @@ public class SingleProtobufUtils {
marshaller(ListValue.getDefaultInstance());
}
+ static boolean isSupported(Class<?> clazz) {
+ if (clazz == null) {
+ return false;
+ }
+ return MessageLite.class.isAssignableFrom(clazz);
+ }
+
public static <T extends MessageLite> void marshaller(T defaultInstance) {
marshallers.put(defaultInstance.getClass(), new
SingleMessageMarshaller<>(defaultInstance));
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
index 01415a5..015a87e 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
@@ -21,6 +21,13 @@ import io.netty.handler.codec.http2.Http2Error;
public interface TransportObserver {
+ static int calcCompressFlag(Compressor compressor) {
+ if (null == compressor || IdentityCompressor.NONE.equals(compressor)) {
+ return 0;
+ }
+ return 1;
+ }
+
void onMetadata(Metadata metadata, boolean endStream);
void onData(byte[] data, boolean endStream);
@@ -31,12 +38,4 @@ public interface TransportObserver {
default void onComplete() {
}
-
- static int calcCompressFlag(Compressor compressor) {
- if (null == compressor || IdentityCompressor.NONE.equals(compressor)) {
- return 0;
- }
- return 1;
- }
-
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
index 0d94ac7..e8199b0 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
@@ -61,6 +61,12 @@ public enum TripleHeaderEnum {
excludeAttachmentsSet.add(TripleConstant.TE_KEY);
}
+ private final String header;
+
+ TripleHeaderEnum(String header) {
+ this.header = header;
+ }
+
public static TripleHeaderEnum getEnum(String header) {
return enumMap.get(header);
}
@@ -73,12 +79,6 @@ public enum TripleHeaderEnum {
return excludeAttachmentsSet.contains(key) || enumMap.containsKey(key);
}
- private final String header;
-
- TripleHeaderEnum(String header) {
- this.header = header;
- }
-
public String getHeader() {
return header;
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index 1fea532..c3e783a 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -52,7 +52,7 @@ import static
org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
private static final Logger LOGGER =
LoggerFactory.getLogger(TripleHttp2FrameServerHandler.class);
private final PathResolver PATH_RESOLVER;
- private FrameworkModel frameworkModel;
+ private final FrameworkModel frameworkModel;
public TripleHttp2FrameServerHandler(FrameworkModel frameworkModel) {
this.frameworkModel = frameworkModel;
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index e815521..8c301dc 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -63,15 +63,15 @@ public class TripleHttp2Protocol extends Http2WireProtocol
implements ScopeModel
public void configServerPipeline(URL url, ChannelPipeline pipeline,
SslContext sslContext) {
final Configuration config =
ConfigurationUtils.getGlobalConfiguration(applicationModel);
final Http2FrameCodec codec = Http2FrameCodecBuilder.forServer()
- .gracefulShutdownTimeoutMillis(10000)
- .initialSettings(new Http2Settings()
-
.headerTableSize(config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, 4096))
-
.maxConcurrentStreams(config.getInt(H2_SETTINGS_MAX_CONCURRENT_STREAMS_KEY,
Integer.MAX_VALUE))
-
.initialWindowSize(config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, 1 << 20))
-
.maxFrameSize(config.getInt(H2_SETTINGS_MAX_FRAME_SIZE_KEY, 2 << 14))
-
.maxHeaderListSize(config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, 8192)))
- .frameLogger(SERVER_LOGGER)
- .build();
+ .gracefulShutdownTimeoutMillis(10000)
+ .initialSettings(new Http2Settings()
+
.headerTableSize(config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, 4096))
+
.maxConcurrentStreams(config.getInt(H2_SETTINGS_MAX_CONCURRENT_STREAMS_KEY,
Integer.MAX_VALUE))
+
.initialWindowSize(config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, 1 << 20))
+ .maxFrameSize(config.getInt(H2_SETTINGS_MAX_FRAME_SIZE_KEY, 2
<< 14))
+
.maxHeaderListSize(config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, 8192)))
+ .frameLogger(SERVER_LOGGER)
+ .build();
final Http2MultiplexHandler handler = new Http2MultiplexHandler(new
TripleServerInitializer(frameworkModel));
pipeline.addLast(codec, new TripleServerConnectionHandler(), handler);
}
@@ -80,16 +80,16 @@ public class TripleHttp2Protocol extends Http2WireProtocol
implements ScopeModel
public void configClientPipeline(URL url, ChannelPipeline pipeline,
SslContext sslContext) {
final Configuration config =
ConfigurationUtils.getGlobalConfiguration(applicationModel);
final Http2FrameCodec codec = Http2FrameCodecBuilder.forClient()
- .gracefulShutdownTimeoutMillis(10000)
- .initialSettings(new Http2Settings()
-
.headerTableSize(config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, 4096))
-
.pushEnabled(config.getBoolean(H2_SETTINGS_ENABLE_PUSH_KEY, false))
-
.maxConcurrentStreams(config.getInt(H2_SETTINGS_MAX_CONCURRENT_STREAMS_KEY,
Integer.MAX_VALUE))
-
.initialWindowSize(config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, 1 << 20))
-
.maxFrameSize(config.getInt(H2_SETTINGS_MAX_FRAME_SIZE_KEY, 2 << 14))
-
.maxHeaderListSize(config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, 8192)))
- .frameLogger(CLIENT_LOGGER)
- .build();
+ .gracefulShutdownTimeoutMillis(10000)
+ .initialSettings(new Http2Settings()
+
.headerTableSize(config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, 4096))
+ .pushEnabled(config.getBoolean(H2_SETTINGS_ENABLE_PUSH_KEY,
false))
+
.maxConcurrentStreams(config.getInt(H2_SETTINGS_MAX_CONCURRENT_STREAMS_KEY,
Integer.MAX_VALUE))
+
.initialWindowSize(config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, 1 << 20))
+ .maxFrameSize(config.getInt(H2_SETTINGS_MAX_FRAME_SIZE_KEY, 2
<< 14))
+
.maxHeaderListSize(config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, 8192)))
+ .frameLogger(CLIENT_LOGGER)
+ .build();
final Http2MultiplexHandler handler = new Http2MultiplexHandler(new
TripleClientHandler(frameworkModel));
pipeline.addLast(codec, handler, new
TripleClientRequestHandler(frameworkModel));
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index 2011a04..a57f356 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -83,7 +83,7 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
inv.setServiceModel(RpcContext.getServiceContext().getConsumerUrl().getServiceModel());
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.SERIALIZATION_KEY,
- getUrl().getParameter(Constants.SERIALIZATION_KEY,
Constants.DEFAULT_REMOTING_SERIALIZATION));
+ getUrl().getParameter(Constants.SERIALIZATION_KEY,
Constants.DEFAULT_REMOTING_SERIALIZATION));
try {
int timeout = calculateTimeout(invocation, methodName);
invocation.put(TIMEOUT_KEY, timeout);
@@ -126,12 +126,12 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
return result;
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION,
- "Invoke remote method timeout. method: " +
invocation.getMethodName() + ", provider: " + getUrl()
- + ", cause: " + e.getMessage(), e);
+ "Invoke remote method timeout. method: " +
invocation.getMethodName() + ", provider: " + getUrl()
+ + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION,
- "Failed to invoke remote method: " +
invocation.getMethodName() + ", provider: " + getUrl()
- + ", cause: " + e.getMessage(), e);
+ "Failed to invoke remote method: " +
invocation.getMethodName() + ", provider: " + getUrl()
+ + ", cause: " + e.getMessage(), e);
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
index 2d16e8e..6728b09 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerInboundHandler.java
@@ -26,7 +26,7 @@ public class TripleServerInboundHandler extends
ChannelInboundHandlerAdapter {
final byte[] data = (byte[]) msg;
if (serverStream != null) {
serverStream.asTransportObserver()
- .onData(data, false);
+ .onData(data, false);
}
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
index c352b7a..f1f268c 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleUtil.java
@@ -70,10 +70,7 @@ public class TripleUtil {
if (QUIET_EXCEPTIONS_CLASS.contains(t.getClass())) {
return true;
}
- if (QUIET_EXCEPTIONS.contains(t.getClass().getSimpleName())) {
- return true;
- }
- return false;
+ return QUIET_EXCEPTIONS.contains(t.getClass().getSimpleName());
}
/**
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
index 452b514..e90f29e 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
@@ -68,8 +68,8 @@ public class UnaryClientStream extends AbstractClientStream
implements Stream {
DefaultFuture2.received(getConnection(), response);
} catch (Exception e) {
final GrpcStatus status =
GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withCause(e)
- .withDescription("Failed to deserialize response");
+ .withCause(e)
+ .withDescription("Failed to deserialize response");
onError(status);
}
});
@@ -114,7 +114,7 @@ public class UnaryClientStream extends AbstractClientStream
implements Stream {
DebugInfo debugInfo = (DebugInfo)
classObjectMap.get(DebugInfo.class);
if (debugInfo == null) {
return new RpcException(statusDetail.getCode(),
- statusDetail.getMessage());
+ statusDetail.getMessage());
}
String msg =
ExceptionUtils.getStackFrameString(debugInfo.getStackEntriesList());
return new RpcException(statusDetail.getCode(), msg);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
index b5ec80e..fde24a0 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
@@ -105,7 +105,7 @@ public class UnaryServerStream extends AbstractServerStream
implements Stream {
transportError(rpcExceptionCodeToGrpc(((RpcException) exception).getCode())
.withCause(exception),
response.getObjectAttachments());
final GrpcStatus status =
rpcExceptionCodeToGrpc(((RpcException) exception).getCode())
- .withCause(exception);
+ .withCause(exception);
transportError(status,
response.getObjectAttachments());
} else {
transportError(GrpcStatus.fromCode(GrpcStatus.Code.UNKNOWN)
@@ -139,7 +139,7 @@ public class UnaryServerStream extends AbstractServerStream
implements Stream {
LOGGER.warn("Exception processing triple message", e);
if (e instanceof RpcException) {
final GrpcStatus status =
rpcExceptionCodeToGrpc(((RpcException) e).getCode())
- .withCause(e);
+ .withCause(e);
transportError(status,
response.getObjectAttachments());
} else {
transportError(GrpcStatus.fromCode(GrpcStatus.Code.UNKNOWN)
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriBuiltinService.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriBuiltinService.java
index fff08c7..bc7d744 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriBuiltinService.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriBuiltinService.java
@@ -90,7 +90,7 @@ public class TriBuiltinService {
Invoker<?> invoker = proxyFactory.getInvoker(healthService,
Health.class, url);
pathResolver.add(url.getServiceKey(), invoker);
pathResolver.add(url.getServiceInterface(), invoker);
- providerModel.setDestroyCaller(()->{
+ providerModel.setDestroyCaller(() -> {
pathResolver.remove(url.getServiceKey());
pathResolver.remove(url.getServiceInterface());
return null;
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriHealthImpl.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriHealthImpl.java
index 3c9c975..59bda55 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriHealthImpl.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/service/TriHealthImpl.java
@@ -43,10 +43,6 @@ public class TriHealthImpl implements Health {
private final Map<String, HealthCheckResponse.ServingStatus> statusMap =
new ConcurrentHashMap<>();
private final Object watchLock = new Object();
-
- // Indicates if future status changes should be ignored.
- private boolean terminal;
-
// Technically a Multimap<String, StreamObserver<HealthCheckResponse>>.
The Boolean value is not
// used. The StreamObservers need to be kept in a identity-equality set,
to make sure
// user-defined equals() doesn't confuse our book-keeping of the
StreamObservers. Constructing
@@ -54,12 +50,19 @@ public class TriHealthImpl implements Health {
// would rather not have the Guava collections dependency.
private final HashMap<String,
IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean>>
watchers = new HashMap<>();
+ // Indicates if future status changes should be ignored.
+ private boolean terminal;
public TriHealthImpl() {
// Copy of what Go and C++ do.
statusMap.put(HealthStatusManager.SERVICE_NAME_ALL_SERVICES,
HealthCheckResponse.ServingStatus.SERVING);
}
+ private static HealthCheckResponse
getResponseForWatch(HealthCheckResponse.ServingStatus recordedStatus) {
+ return HealthCheckResponse.newBuilder().setStatus(
+ recordedStatus == null ?
HealthCheckResponse.ServingStatus.SERVICE_UNKNOWN : recordedStatus).build();
+ }
+
@Override
public HealthCheckResponse check(HealthCheckRequest request) {
HealthCheckResponse.ServingStatus status =
statusMap.get(request.getService());
@@ -151,9 +154,4 @@ public class TriHealthImpl implements Health {
}
}
}
-
- private static HealthCheckResponse
getResponseForWatch(HealthCheckResponse.ServingStatus recordedStatus) {
- return HealthCheckResponse.newBuilder().setStatus(
- recordedStatus == null ?
HealthCheckResponse.ServingStatus.SERVICE_UNKNOWN : recordedStatus).build();
- }
}