This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 0f4dc26 [3.0-Triple] Add timeout check before send response (#9498)
0f4dc26 is described below
commit 0f4dc265c089b80dd31fe06e1ecd9c9902e75dd7
Author: GuoHao <[email protected]>
AuthorDate: Mon Jan 17 11:16:55 2022 +0800
[3.0-Triple] Add timeout check before send response (#9498)
* Add tiemout check before send response
* Fix ut
* fix ut
* Fix ut
* fix npe
* fix style
* Fix ut
* Do not send timeout when no timeout set
Co-authored-by: earthchen <[email protected]>
---
.../rpc/protocol/tri/AbstractClientStream.java | 7 +--
.../rpc/protocol/tri/AbstractServerStream.java | 50 +++++++++++++++++++---
.../dubbo/rpc/protocol/tri/AbstractStream.java | 17 +++++++-
.../apache/dubbo/rpc/protocol/tri/Metadata.java | 6 +++
.../dubbo/rpc/protocol/tri/UnaryServerStream.java | 33 +++++++++-----
.../rpc/protocol/tri/command/QueuedCommand.java | 17 +++++---
.../dubbo/rpc/protocol/tri/ClientStreamTest.java | 2 +-
.../dubbo/rpc/protocol/tri/ServerStreamTest.java | 1 +
.../dubbo/rpc/protocol/tri/TripleProtocolTest.java | 7 ++-
.../dubbo/rpc/protocol/tri/WriteQueueTest.java | 1 +
10 files changed, 110 insertions(+), 31 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
index 7beacae..eefa588 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
@@ -402,10 +402,11 @@ public abstract class AbstractClientStream extends
AbstractStream implements Str
.put(Http2Headers.PseudoHeaderName.METHOD.value(),
HttpMethod.POST.asciiName());
metadata.put(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(),
TripleConstant.CONTENT_PROTO)
- .put(TripleHeaderEnum.TIMEOUT.getHeader(),
inv.get(CommonConstants.TIMEOUT_KEY) + "m")
- .put(HttpHeaderNames.TE, HttpHeaderValues.TRAILERS)
- ;
+ .put(HttpHeaderNames.TE, HttpHeaderValues.TRAILERS);
+ if (inv.get(CommonConstants.TIMEOUT_KEY) != null) {
+ metadata.put(TripleHeaderEnum.TIMEOUT.getHeader(),
inv.get(CommonConstants.TIMEOUT_KEY) + "m");
+ }
metadata.putIfNotNull(TripleHeaderEnum.SERVICE_VERSION.getHeader(),
getUrl().getVersion())
.putIfNotNull(TripleHeaderEnum.CONSUMER_APP_NAME_KEY.getHeader(),
(String)
inv.getObjectAttachments().remove(CommonConstants.APPLICATION_KEY))
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index c9c56fe..b9ab313 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -20,6 +20,7 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.serialize.MultipleSerialization;
+import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.HeaderFilter;
import org.apache.dubbo.rpc.Invoker;
@@ -34,9 +35,6 @@ import org.apache.dubbo.triple.TripleWrapper;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http2.Http2Headers;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -44,11 +42,14 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import static
org.apache.dubbo.common.constants.CommonConstants.HEADER_FILTER_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
public abstract class AbstractServerStream extends AbstractStream implements
Stream {
@@ -146,6 +147,19 @@ public abstract class AbstractServerStream extends
AbstractStream implements Str
final Map<String, Object> attachments =
parseMetadataToAttachmentMap(metadata);
inv.setObjectAttachments(attachments);
+ // handle timeout
+ CharSequence timeout =
metadata.get(TripleHeaderEnum.TIMEOUT.getHeader());
+ try {
+ if (!Objects.isNull(timeout)) {
+ final Long timeoutInNanos =
parseTimeoutToNanos(timeout.toString());
+ if (!Objects.isNull(timeoutInNanos)) {
+ inv.setAttachment(TIMEOUT_KEY, timeoutInNanos);
+ }
+ }
+ } catch (Throwable t) {
+ LOGGER.warn(String.format("Failed to parse request timeout set
from:%s, service=%s method=%s", timeout,
getServiceDescriptor().getServiceName(),
+ getMethodName()));
+ }
invokeHeaderFilter(inv);
return inv;
}
@@ -299,14 +313,37 @@ public abstract class AbstractServerStream extends
AbstractStream implements Str
* create basic meta data
*/
protected Metadata createResponseMeta() {
- Metadata metadata = new DefaultMetadata();
- metadata.put(Http2Headers.PseudoHeaderName.STATUS.value(),
HttpResponseStatus.OK.codeAsText());
- metadata.put(HttpHeaderNames.CONTENT_TYPE,
TripleConstant.CONTENT_PROTO);
+ Metadata metadata = createDefaultMetadata();
metadata.putIfNotNull(TripleHeaderEnum.GRPC_ENCODING.getHeader(),
super.getCompressor().getMessageEncoding())
.putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(),
getAcceptEncoding());
return metadata;
}
+ protected Long parseTimeoutToNanos(String timeoutVal) {
+ if (StringUtils.isEmpty(timeoutVal) ||
StringUtils.isContains(timeoutVal, "null")) {
+ return null;
+ }
+ long value = Long.parseLong(timeoutVal.substring(0,
timeoutVal.length() - 1));
+ char unit = timeoutVal.charAt(timeoutVal.length() - 1);
+ switch (unit) {
+ case 'n':
+ return value;
+ case 'u':
+ return TimeUnit.MICROSECONDS.toNanos(value);
+ case 'm':
+ return TimeUnit.MILLISECONDS.toNanos(value);
+ case 'S':
+ return TimeUnit.SECONDS.toNanos(value);
+ case 'M':
+ return TimeUnit.MINUTES.toNanos(value);
+ case 'H':
+ return TimeUnit.HOURS.toNanos(value);
+ default:
+ // invalid timeout config
+ return null;
+ }
+ }
+
protected byte[] encodeResponse(Object value) {
final ClassLoader tccl =
Thread.currentThread().getContextClassLoader();
try {
@@ -381,6 +418,7 @@ public abstract class AbstractServerStream extends
AbstractStream implements Str
.withCause(throwable));
}
+
public TripleWrapper.TripleResponseWrapper wrapResp(URL url, String
serializeType, Object resp,
MethodDescriptor desc,
MultipleSerialization
multipleSerialization) {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index c03a4a1..dcf4082 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -31,6 +31,8 @@ import org.apache.dubbo.rpc.model.MethodDescriptor;
import com.google.protobuf.Any;
import com.google.rpc.DebugInfo;
import com.google.rpc.Status;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http2.Http2Headers;
import java.io.ByteArrayInputStream;
@@ -302,7 +304,7 @@ public abstract class AbstractStream implements Stream {
protected void transportError(GrpcStatus status, Map<String, Object>
attachments, boolean onlyTrailers) {
if (!onlyTrailers) {
// set metadata
- Metadata metadata = new DefaultMetadata();
+ Metadata metadata = createDefaultMetadata();
outboundTransportObserver().onMetadata(metadata, false);
}
// set trailers
@@ -359,6 +361,19 @@ public abstract class AbstractStream implements Stream {
return metadata;
}
+
+ /**
+ * default header
+ * <p>
+ * only status and content-type
+ */
+ protected Metadata createDefaultMetadata() {
+ Metadata metadata = new DefaultMetadata();
+ metadata.put(Http2Headers.PseudoHeaderName.STATUS.value(),
HttpResponseStatus.OK.codeAsText());
+ metadata.put(HttpHeaderNames.CONTENT_TYPE,
TripleConstant.CONTENT_PROTO);
+ return metadata;
+ }
+
/**
* Parse metadata to a KV pairs map.
*
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Metadata.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Metadata.java
index cb16191..f7c49f7 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Metadata.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Metadata.java
@@ -18,6 +18,7 @@
package org.apache.dubbo.rpc.protocol.tri;
import java.util.Map;
+import java.util.Optional;
public interface Metadata extends Iterable<Map.Entry<CharSequence,
CharSequence>> {
@@ -32,6 +33,11 @@ public interface Metadata extends
Iterable<Map.Entry<CharSequence, CharSequence>
CharSequence get(CharSequence key);
+
+ default CharSequence getOrDefault(CharSequence key, CharSequence val) {
+ return Optional.ofNullable(get(key)).orElse(val);
+ }
+
boolean contains(CharSequence key);
boolean remove(CharSequence key);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
index 688091c..04412a5 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
@@ -27,6 +27,7 @@ import org.apache.dubbo.rpc.RpcInvocation;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
import static org.apache.dubbo.rpc.protocol.tri.GrpcStatus.getStatus;
public class UnaryServerStream extends AbstractServerStream implements Stream {
@@ -68,6 +69,7 @@ public class UnaryServerStream extends AbstractServerStream
implements Stream {
if (invocation == null) {
return;
}
+ final long stInNano = System.nanoTime();
final Result result = getInvoker().invoke(invocation);
CompletionStage<Object> future =
result.thenApply(Function.identity());
future.whenComplete((o, throwable) -> {
@@ -81,17 +83,28 @@ public class UnaryServerStream extends AbstractServerStream
implements Stream {
transportError(getStatus(response.getException()));
return;
}
- Metadata metadata = createResponseMeta();
- outboundTransportObserver().onMetadata(metadata, false);
- final byte[] data = encodeResponse(response.getValue());
- if (data == null) {
- // already handled in encodeResponse()
- return;
+ final Object timeoutVal =
invocation.getObjectAttachment(TIMEOUT_KEY);
+ final long cost = System.nanoTime() - stInNano;
+ if (timeoutVal != null && cost > ((Long) timeoutVal)) {
+ LOGGER.error(String.format("Invoke timeout at server side,
ignored to send response. service=%s method=%s cost=%s timeout=%s",
+ invocation.getTargetServiceUniqueName(),
+ invocation.getMethodName(),
+ cost, timeoutVal));
+ outboundTransportObserver()
+
.onError(GrpcStatus.fromCode(GrpcStatus.Code.DEADLINE_EXCEEDED));
+ } else {
+ Metadata metadata = createResponseMeta();
+ outboundTransportObserver().onMetadata(metadata, false);
+ final byte[] data = encodeResponse(response.getValue());
+ if (data == null) {
+ // already handled in encodeResponse()
+ return;
+ }
+ outboundTransportObserver().onData(data, false);
+ Metadata trailers =
TripleConstant.getSuccessResponseMeta();
+ convertAttachment(trailers,
response.getObjectAttachments());
+ outboundTransportObserver().onMetadata(trailers, true);
}
- outboundTransportObserver().onData(data, false);
- Metadata trailers = TripleConstant.getSuccessResponseMeta();
- convertAttachment(trailers, response.getObjectAttachments());
- outboundTransportObserver().onMetadata(trailers, true);
});
RpcContext.removeContext();
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/QueuedCommand.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/QueuedCommand.java
index c06ecfa..7fb44de 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/QueuedCommand.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/QueuedCommand.java
@@ -33,9 +33,8 @@ public interface QueuedCommand {
abstract class AbstractQueuedCommand implements QueuedCommand {
- private ChannelPromise promise;
-
protected boolean flush = false;
+ private ChannelPromise promise;
@Override
public ChannelPromise promise() {
@@ -53,13 +52,19 @@ public interface QueuedCommand {
@Override
public void run(Channel channel) {
- channel.write(this, promise);
+ if (channel.isActive()) {
+ channel.write(this, promise);
+ } else {
+ promise.trySuccess();
+ }
}
public final void send(ChannelHandlerContext ctx, ChannelPromise
promise) {
- doSend(ctx, promise);
- if (flush) {
- ctx.flush();
+ if (ctx.channel().isActive()) {
+ doSend(ctx, promise);
+ if (flush) {
+ ctx.flush();
+ }
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamTest.java
index 2138737..3db5fb7 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamTest.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamTest.java
@@ -69,7 +69,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
import static org.apache.dubbo.rpc.protocol.tri.TripleConstant.HTTP_SCHEME;
import static org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum.GRPC_ENCODING;
@@ -333,6 +332,7 @@ public class ClientStreamTest {
Mockito.when(streamChannel.writeAndFlush(Mockito.any())).thenReturn(channelFuture);
Mockito.when(streamChannel.alloc()).thenReturn(ByteBufAllocator.DEFAULT);
Mockito.when(streamChannel.attr(TripleConstant.CLIENT_STREAM_KEY)).thenReturn(attribute);
+ Mockito.when(streamChannel.isActive()).thenReturn(true);
Mockito.when(streamChannel.eventLoop()).thenReturn(eventLoop);
Mockito.when(streamChannel.newPromise()).thenReturn(promise);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/ServerStreamTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/ServerStreamTest.java
index d034495..3081e18 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/ServerStreamTest.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/ServerStreamTest.java
@@ -262,6 +262,7 @@ public class ServerStreamTest {
Mockito.when(streamChannel.writeAndFlush(Mockito.any())).thenReturn(channelFuture);
Mockito.when(streamChannel.alloc()).thenReturn(ByteBufAllocator.DEFAULT);
Mockito.when(streamChannel.attr(TripleConstant.SERVER_STREAM_KEY)).thenReturn(attribute);
+ Mockito.when(streamChannel.isActive()).thenReturn(true);
Mockito.when(streamChannel.eventLoop()).thenReturn(eventLoop);
Mockito.when(streamChannel.newPromise()).thenReturn(promise);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocolTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocolTest.java
index 083dabe..ce5ffcf 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocolTest.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocolTest.java
@@ -18,7 +18,6 @@
package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.rpc.Protocol;
@@ -40,9 +39,6 @@ import java.util.concurrent.TimeUnit;
public class TripleProtocolTest {
- private Protocol protocol =
ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
- private ProxyFactory proxy =
ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
- private final String REQUEST_MSG = "hello world";
@Test
public void testDemoProtocol() throws Exception {
@@ -64,6 +60,8 @@ public class TripleProtocolTest {
serviceRepository.registerProvider(providerModel);
url = url.setServiceModel(providerModel);
+ Protocol protocol
=ApplicationModel.defaultModel().getExtensionLoader(Protocol.class).getAdaptiveExtension();
+ ProxyFactory proxy =
ApplicationModel.defaultModel().getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
protocol.export(proxy.getInvoker(serviceImpl, IGreeter.class, url));
ConsumerModel consumerModel = new ConsumerModel(url.getServiceKey(),
null, serviceDescriptor, null,
@@ -73,6 +71,7 @@ public class TripleProtocolTest {
Thread.sleep(1000);
// 1. test unaryStream
+ String REQUEST_MSG = "hello world";
Assertions.assertEquals(REQUEST_MSG, greeterProxy.echo(REQUEST_MSG));
Assertions.assertEquals(REQUEST_MSG,
serviceImpl.echoAsync(REQUEST_MSG).get());
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/WriteQueueTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/WriteQueueTest.java
index 9f5dc2b..1cf928a 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/WriteQueueTest.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/WriteQueueTest.java
@@ -54,6 +54,7 @@ public class WriteQueueTest {
ChannelPromise promise = Mockito.mock(ChannelPromise.class);
EventLoop eventLoop = new DefaultEventLoop();
Mockito.when(channel.eventLoop()).thenReturn(eventLoop);
+ Mockito.when(channel.isActive()).thenReturn(true);
Mockito.when(channel.newPromise()).thenReturn(promise);
Mockito.when(channel.write(Mockito.any(), Mockito.any())).thenAnswer(
(Answer<ChannelPromise>) invocationOnMock -> {