This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 0f4dc26  [3.0-Triple] Add timeout check before send response (#9498)
0f4dc26 is described below

commit 0f4dc265c089b80dd31fe06e1ecd9c9902e75dd7
Author: GuoHao <[email protected]>
AuthorDate: Mon Jan 17 11:16:55 2022 +0800

    [3.0-Triple] Add timeout check before send response (#9498)
    
    * Add tiemout check before send response
    
    * Fix ut
    
    * fix ut
    
    * Fix ut
    
    * fix npe
    
    * fix style
    
    * Fix ut
    
    * Do not send timeout when no timeout set
    
    Co-authored-by: earthchen <[email protected]>
---
 .../rpc/protocol/tri/AbstractClientStream.java     |  7 +--
 .../rpc/protocol/tri/AbstractServerStream.java     | 50 +++++++++++++++++++---
 .../dubbo/rpc/protocol/tri/AbstractStream.java     | 17 +++++++-
 .../apache/dubbo/rpc/protocol/tri/Metadata.java    |  6 +++
 .../dubbo/rpc/protocol/tri/UnaryServerStream.java  | 33 +++++++++-----
 .../rpc/protocol/tri/command/QueuedCommand.java    | 17 +++++---
 .../dubbo/rpc/protocol/tri/ClientStreamTest.java   |  2 +-
 .../dubbo/rpc/protocol/tri/ServerStreamTest.java   |  1 +
 .../dubbo/rpc/protocol/tri/TripleProtocolTest.java |  7 ++-
 .../dubbo/rpc/protocol/tri/WriteQueueTest.java     |  1 +
 10 files changed, 110 insertions(+), 31 deletions(-)

diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
index 7beacae..eefa588 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
@@ -402,10 +402,11 @@ public abstract class AbstractClientStream extends 
AbstractStream implements Str
             .put(Http2Headers.PseudoHeaderName.METHOD.value(), 
HttpMethod.POST.asciiName());
 
         metadata.put(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), 
TripleConstant.CONTENT_PROTO)
-            .put(TripleHeaderEnum.TIMEOUT.getHeader(), 
inv.get(CommonConstants.TIMEOUT_KEY) + "m")
-            .put(HttpHeaderNames.TE, HttpHeaderValues.TRAILERS)
-        ;
+            .put(HttpHeaderNames.TE, HttpHeaderValues.TRAILERS);
 
+        if (inv.get(CommonConstants.TIMEOUT_KEY) != null) {
+            metadata.put(TripleHeaderEnum.TIMEOUT.getHeader(), 
inv.get(CommonConstants.TIMEOUT_KEY) + "m");
+        }
         metadata.putIfNotNull(TripleHeaderEnum.SERVICE_VERSION.getHeader(), 
getUrl().getVersion())
             .putIfNotNull(TripleHeaderEnum.CONSUMER_APP_NAME_KEY.getHeader(),
                 (String) 
inv.getObjectAttachments().remove(CommonConstants.APPLICATION_KEY))
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index c9c56fe..b9ab313 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -20,6 +20,7 @@ package org.apache.dubbo.rpc.protocol.tri;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.serialize.MultipleSerialization;
+import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.rpc.HeaderFilter;
 import org.apache.dubbo.rpc.Invoker;
@@ -34,9 +35,6 @@ import org.apache.dubbo.triple.TripleWrapper;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http2.Http2Headers;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -44,11 +42,14 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.dubbo.common.constants.CommonConstants.HEADER_FILTER_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
 
 public abstract class AbstractServerStream extends AbstractStream implements 
Stream {
 
@@ -146,6 +147,19 @@ public abstract class AbstractServerStream extends 
AbstractStream implements Str
 
         final Map<String, Object> attachments = 
parseMetadataToAttachmentMap(metadata);
         inv.setObjectAttachments(attachments);
+        // handle timeout
+        CharSequence timeout = 
metadata.get(TripleHeaderEnum.TIMEOUT.getHeader());
+        try {
+            if (!Objects.isNull(timeout)) {
+                final Long timeoutInNanos = 
parseTimeoutToNanos(timeout.toString());
+                if (!Objects.isNull(timeoutInNanos)) {
+                    inv.setAttachment(TIMEOUT_KEY, timeoutInNanos);
+                }
+            }
+        } catch (Throwable t) {
+            LOGGER.warn(String.format("Failed to parse request timeout set 
from:%s, service=%s method=%s", timeout, 
getServiceDescriptor().getServiceName(),
+                getMethodName()));
+        }
         invokeHeaderFilter(inv);
         return inv;
     }
@@ -299,14 +313,37 @@ public abstract class AbstractServerStream extends 
AbstractStream implements Str
      * create basic meta data
      */
     protected Metadata createResponseMeta() {
-        Metadata metadata = new DefaultMetadata();
-        metadata.put(Http2Headers.PseudoHeaderName.STATUS.value(), 
HttpResponseStatus.OK.codeAsText());
-        metadata.put(HttpHeaderNames.CONTENT_TYPE, 
TripleConstant.CONTENT_PROTO);
+        Metadata metadata = createDefaultMetadata();
         metadata.putIfNotNull(TripleHeaderEnum.GRPC_ENCODING.getHeader(), 
super.getCompressor().getMessageEncoding())
             .putIfNotNull(TripleHeaderEnum.GRPC_ACCEPT_ENCODING.getHeader(), 
getAcceptEncoding());
         return metadata;
     }
 
+    protected Long parseTimeoutToNanos(String timeoutVal) {
+        if (StringUtils.isEmpty(timeoutVal) || 
StringUtils.isContains(timeoutVal, "null")) {
+            return null;
+        }
+        long value = Long.parseLong(timeoutVal.substring(0, 
timeoutVal.length() - 1));
+        char unit = timeoutVal.charAt(timeoutVal.length() - 1);
+        switch (unit) {
+            case 'n':
+                return value;
+            case 'u':
+                return TimeUnit.MICROSECONDS.toNanos(value);
+            case 'm':
+                return TimeUnit.MILLISECONDS.toNanos(value);
+            case 'S':
+                return TimeUnit.SECONDS.toNanos(value);
+            case 'M':
+                return TimeUnit.MINUTES.toNanos(value);
+            case 'H':
+                return TimeUnit.HOURS.toNanos(value);
+            default:
+                // invalid timeout config
+                return null;
+        }
+    }
+
     protected byte[] encodeResponse(Object value) {
         final ClassLoader tccl = 
Thread.currentThread().getContextClassLoader();
         try {
@@ -381,6 +418,7 @@ public abstract class AbstractServerStream extends 
AbstractStream implements Str
                 .withCause(throwable));
     }
 
+
     public TripleWrapper.TripleResponseWrapper wrapResp(URL url, String 
serializeType, Object resp,
                                                         MethodDescriptor desc,
                                                         MultipleSerialization 
multipleSerialization) {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index c03a4a1..dcf4082 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -31,6 +31,8 @@ import org.apache.dubbo.rpc.model.MethodDescriptor;
 import com.google.protobuf.Any;
 import com.google.rpc.DebugInfo;
 import com.google.rpc.Status;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http2.Http2Headers;
 
 import java.io.ByteArrayInputStream;
@@ -302,7 +304,7 @@ public abstract class AbstractStream implements Stream {
     protected void transportError(GrpcStatus status, Map<String, Object> 
attachments, boolean onlyTrailers) {
         if (!onlyTrailers) {
             // set metadata
-            Metadata metadata = new DefaultMetadata();
+            Metadata metadata = createDefaultMetadata();
             outboundTransportObserver().onMetadata(metadata, false);
         }
         // set trailers
@@ -359,6 +361,19 @@ public abstract class AbstractStream implements Stream {
         return metadata;
     }
 
+
+    /**
+     * default header
+     * <p>
+     * only status and content-type
+     */
+    protected Metadata createDefaultMetadata() {
+        Metadata metadata = new DefaultMetadata();
+        metadata.put(Http2Headers.PseudoHeaderName.STATUS.value(), 
HttpResponseStatus.OK.codeAsText());
+        metadata.put(HttpHeaderNames.CONTENT_TYPE, 
TripleConstant.CONTENT_PROTO);
+        return metadata;
+    }
+
     /**
      * Parse metadata to a KV pairs map.
      *
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Metadata.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Metadata.java
index cb16191..f7c49f7 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Metadata.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Metadata.java
@@ -18,6 +18,7 @@
 package org.apache.dubbo.rpc.protocol.tri;
 
 import java.util.Map;
+import java.util.Optional;
 
 public interface Metadata extends Iterable<Map.Entry<CharSequence, 
CharSequence>> {
 
@@ -32,6 +33,11 @@ public interface Metadata extends 
Iterable<Map.Entry<CharSequence, CharSequence>
 
     CharSequence get(CharSequence key);
 
+
+    default CharSequence getOrDefault(CharSequence key, CharSequence val) {
+        return Optional.ofNullable(get(key)).orElse(val);
+    }
+
     boolean contains(CharSequence key);
 
     boolean remove(CharSequence key);
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
index 688091c..04412a5 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
@@ -27,6 +27,7 @@ import org.apache.dubbo.rpc.RpcInvocation;
 import java.util.concurrent.CompletionStage;
 import java.util.function.Function;
 
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
 import static org.apache.dubbo.rpc.protocol.tri.GrpcStatus.getStatus;
 
 public class UnaryServerStream extends AbstractServerStream implements Stream {
@@ -68,6 +69,7 @@ public class UnaryServerStream extends AbstractServerStream 
implements Stream {
             if (invocation == null) {
                 return;
             }
+            final long stInNano = System.nanoTime();
             final Result result = getInvoker().invoke(invocation);
             CompletionStage<Object> future = 
result.thenApply(Function.identity());
             future.whenComplete((o, throwable) -> {
@@ -81,17 +83,28 @@ public class UnaryServerStream extends AbstractServerStream 
implements Stream {
                     transportError(getStatus(response.getException()));
                     return;
                 }
-                Metadata metadata = createResponseMeta();
-                outboundTransportObserver().onMetadata(metadata, false);
-                final byte[] data = encodeResponse(response.getValue());
-                if (data == null) {
-                    // already handled in encodeResponse()
-                    return;
+                final Object timeoutVal = 
invocation.getObjectAttachment(TIMEOUT_KEY);
+                final long cost = System.nanoTime() - stInNano;
+                if (timeoutVal != null && cost > ((Long) timeoutVal)) {
+                    LOGGER.error(String.format("Invoke timeout at server side, 
ignored to send response. service=%s method=%s cost=%s timeout=%s",
+                        invocation.getTargetServiceUniqueName(),
+                        invocation.getMethodName(),
+                        cost, timeoutVal));
+                    outboundTransportObserver()
+                        
.onError(GrpcStatus.fromCode(GrpcStatus.Code.DEADLINE_EXCEEDED));
+                } else {
+                    Metadata metadata = createResponseMeta();
+                    outboundTransportObserver().onMetadata(metadata, false);
+                    final byte[] data = encodeResponse(response.getValue());
+                    if (data == null) {
+                        // already handled in encodeResponse()
+                        return;
+                    }
+                    outboundTransportObserver().onData(data, false);
+                    Metadata trailers = 
TripleConstant.getSuccessResponseMeta();
+                    convertAttachment(trailers, 
response.getObjectAttachments());
+                    outboundTransportObserver().onMetadata(trailers, true);
                 }
-                outboundTransportObserver().onData(data, false);
-                Metadata trailers = TripleConstant.getSuccessResponseMeta();
-                convertAttachment(trailers, response.getObjectAttachments());
-                outboundTransportObserver().onMetadata(trailers, true);
             });
             RpcContext.removeContext();
         }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/QueuedCommand.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/QueuedCommand.java
index c06ecfa..7fb44de 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/QueuedCommand.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/QueuedCommand.java
@@ -33,9 +33,8 @@ public interface QueuedCommand {
 
     abstract class AbstractQueuedCommand implements QueuedCommand {
 
-        private ChannelPromise promise;
-
         protected boolean flush = false;
+        private ChannelPromise promise;
 
         @Override
         public ChannelPromise promise() {
@@ -53,13 +52,19 @@ public interface QueuedCommand {
 
         @Override
         public void run(Channel channel) {
-            channel.write(this, promise);
+            if (channel.isActive()) {
+                channel.write(this, promise);
+            } else {
+                promise.trySuccess();
+            }
         }
 
         public final void send(ChannelHandlerContext ctx, ChannelPromise 
promise) {
-            doSend(ctx, promise);
-            if (flush) {
-                ctx.flush();
+            if (ctx.channel().isActive()) {
+                doSend(ctx, promise);
+                if (flush) {
+                    ctx.flush();
+                }
             }
         }
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamTest.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamTest.java
index 2138737..3db5fb7 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamTest.java
@@ -69,7 +69,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.dubbo.rpc.protocol.tri.Compressor.DEFAULT_COMPRESSOR;
 import static org.apache.dubbo.rpc.protocol.tri.TripleConstant.HTTP_SCHEME;
 import static org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum.GRPC_ENCODING;
 
@@ -333,6 +332,7 @@ public class ClientStreamTest {
         
Mockito.when(streamChannel.writeAndFlush(Mockito.any())).thenReturn(channelFuture);
         
Mockito.when(streamChannel.alloc()).thenReturn(ByteBufAllocator.DEFAULT);
         
Mockito.when(streamChannel.attr(TripleConstant.CLIENT_STREAM_KEY)).thenReturn(attribute);
+        Mockito.when(streamChannel.isActive()).thenReturn(true);
         Mockito.when(streamChannel.eventLoop()).thenReturn(eventLoop);
         Mockito.when(streamChannel.newPromise()).thenReturn(promise);
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/ServerStreamTest.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/ServerStreamTest.java
index d034495..3081e18 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/ServerStreamTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/ServerStreamTest.java
@@ -262,6 +262,7 @@ public class ServerStreamTest {
         
Mockito.when(streamChannel.writeAndFlush(Mockito.any())).thenReturn(channelFuture);
         
Mockito.when(streamChannel.alloc()).thenReturn(ByteBufAllocator.DEFAULT);
         
Mockito.when(streamChannel.attr(TripleConstant.SERVER_STREAM_KEY)).thenReturn(attribute);
+        Mockito.when(streamChannel.isActive()).thenReturn(true);
         Mockito.when(streamChannel.eventLoop()).thenReturn(eventLoop);
         Mockito.when(streamChannel.newPromise()).thenReturn(promise);
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocolTest.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocolTest.java
index 083dabe..ce5ffcf 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocolTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocolTest.java
@@ -18,7 +18,6 @@
 package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.common.utils.NetUtils;
 import org.apache.dubbo.rpc.Protocol;
@@ -40,9 +39,6 @@ import java.util.concurrent.TimeUnit;
 
 
 public class TripleProtocolTest {
-    private Protocol protocol = 
ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
-    private ProxyFactory proxy = 
ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
-    private final String REQUEST_MSG = "hello world";
 
     @Test
     public void testDemoProtocol() throws Exception {
@@ -64,6 +60,8 @@ public class TripleProtocolTest {
         serviceRepository.registerProvider(providerModel);
         url = url.setServiceModel(providerModel);
 
+        Protocol protocol 
=ApplicationModel.defaultModel().getExtensionLoader(Protocol.class).getAdaptiveExtension();
+        ProxyFactory proxy = 
ApplicationModel.defaultModel().getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
         protocol.export(proxy.getInvoker(serviceImpl, IGreeter.class, url));
 
         ConsumerModel consumerModel = new ConsumerModel(url.getServiceKey(), 
null, serviceDescriptor, null,
@@ -73,6 +71,7 @@ public class TripleProtocolTest {
         Thread.sleep(1000);
 
         // 1. test unaryStream
+        String REQUEST_MSG = "hello world";
         Assertions.assertEquals(REQUEST_MSG, greeterProxy.echo(REQUEST_MSG));
         Assertions.assertEquals(REQUEST_MSG, 
serviceImpl.echoAsync(REQUEST_MSG).get());
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/WriteQueueTest.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/WriteQueueTest.java
index 9f5dc2b..1cf928a 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/WriteQueueTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/WriteQueueTest.java
@@ -54,6 +54,7 @@ public class WriteQueueTest {
         ChannelPromise promise = Mockito.mock(ChannelPromise.class);
         EventLoop eventLoop = new DefaultEventLoop();
         Mockito.when(channel.eventLoop()).thenReturn(eventLoop);
+        Mockito.when(channel.isActive()).thenReturn(true);
         Mockito.when(channel.newPromise()).thenReturn(promise);
         Mockito.when(channel.write(Mockito.any(), Mockito.any())).thenAnswer(
             (Answer<ChannelPromise>) invocationOnMock -> {

Reply via email to