This is an automated email from the ASF dual-hosted git repository.

earthchen pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 5d5d0365b5 Triple http limiting the size of the HTTP request and 
response (#14246)
5d5d0365b5 is described below

commit 5d5d0365b54f842cda0952f3199e8924129f21d8
Author: TomlongTK <[email protected]>
AuthorDate: Tue Jun 11 19:08:41 2024 +0800

    Triple http limiting the size of the HTTP request and response (#14246)
    
    * Triple http limiting the size of the HTTP request and response
    
    * Limit http1 and http2 response body
    
    * Native http2 unary calls use Http2ServerUnaryChannelObserver, The flag 
should also be set when the http1 connection is disconnected
    
    * Format code
    
    * Fix some problems
    
    * Revert netty new api
    
    * Code format
    
    ---------
    
    Co-authored-by: earthchen <[email protected]>
---
 .../http12/AbstractServerHttpChannelObserver.java  | 35 +++++++++++--
 .../dubbo/remoting/http12/HttpChannelObserver.java |  2 +-
 .../remoting/http12/HttpTransportListener.java     |  3 +-
 .../apache/dubbo/remoting/http12/HttpUtils.java    |  6 +--
 .../http12/LimitedByteBufOutputStream.java         | 58 ++++++++++++++++++++++
 .../HttpOverPayloadException.java}                 |  8 +--
 .../http12/h2/Http2ServerChannelObserver.java      | 25 ++--------
 .../remoting/http12/h2/Http2TransportListener.java |  5 +-
 .../remoting/http12/message/codec/BinaryCodec.java |  3 ++
 .../remoting/http12/message/codec/HtmlCodec.java   |  3 ++
 .../remoting/http12/message/codec/JsonCodec.java   |  9 +++-
 .../remoting/http12/message/codec/JsonPbCodec.java |  5 ++
 .../http12/message/codec/PlainTextCodec.java       |  3 ++
 .../http12/message/codec/UrlEncodeFormCodec.java   |  5 ++
 .../remoting/http12/message/codec/XmlCodec.java    |  5 ++
 .../remoting/http12/message/codec/YamlCodec.java   |  9 ++++
 .../http12/netty4/h1/NettyHttp1Channel.java        | 11 ++--
 .../netty4/h1/NettyHttp1ConnectionHandler.java     |  8 ++-
 .../http12/netty4/h2/NettyH2StreamChannel.java     | 10 +++-
 .../http12/netty4/h2/NettyHttp2FrameCodec.java     |  2 +-
 .../h2/NettyHttp2ProtocolSelectorHandler.java      |  9 +++-
 .../dubbo/rpc/protocol/tri/TripleHeaderEnum.java   |  3 +-
 .../rpc/protocol/tri/TripleHttp2Protocol.java      | 20 ++++----
 .../h12/grpc/GrpcHttp2ServerTransportListener.java | 10 ++++
 .../tri/h12/grpc/GrpcServerChannelObserver.java    | 15 ++++--
 .../DefaultHttp11ServerTransportListener.java      |  5 ++
 .../http2/GenericHttp2ServerTransportListener.java | 44 +++++++++++-----
 .../h12/http2/Http2ServerUnaryChannelObserver.java | 52 +++++++++++++++++++
 .../protocol/tri/rest/RestHttpMessageCodec.java    |  3 ++
 29 files changed, 303 insertions(+), 73 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
index 26ce8b705e..fb9c54f79c 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
@@ -35,6 +35,10 @@ public abstract class AbstractServerHttpChannelObserver 
implements CustomizableH
 
     private boolean headerSent;
 
+    private boolean completed;
+
+    private boolean closed;
+
     protected AbstractServerHttpChannelObserver(HttpChannel httpChannel) {
         this.httpChannel = httpChannel;
     }
@@ -69,6 +73,9 @@ public abstract class AbstractServerHttpChannelObserver 
implements CustomizableH
 
     @Override
     public final void onNext(Object data) {
+        if (closed) {
+            return;
+        }
         try {
             doOnNext(data);
         } catch (Throwable e) {
@@ -85,9 +92,12 @@ public abstract class AbstractServerHttpChannelObserver 
implements CustomizableH
 
     @Override
     public final void onError(Throwable throwable) {
+        if (closed) {
+            return;
+        }
         if (throwable instanceof HttpResultPayloadException) {
             onNext(((HttpResultPayloadException) throwable).getResult());
-            doOnCompleted(null);
+            onCompleted(null);
             return;
         }
         try {
@@ -95,7 +105,7 @@ public abstract class AbstractServerHttpChannelObserver 
implements CustomizableH
         } catch (Throwable ex) {
             throwable = new EncodeException(ex);
         } finally {
-            doOnCompleted(throwable);
+            onCompleted(throwable);
         }
     }
 
@@ -110,7 +120,17 @@ public abstract class AbstractServerHttpChannelObserver 
implements CustomizableH
 
     @Override
     public final void onCompleted() {
-        doOnCompleted(null);
+        if (closed) {
+            return;
+        }
+        onCompleted(null);
+    }
+
+    private void onCompleted(Throwable throwable) {
+        if (!completed) {
+            doOnCompleted(throwable);
+            completed = true;
+        }
     }
 
     protected void doOnCompleted(Throwable throwable) {
@@ -198,4 +218,13 @@ public abstract class AbstractServerHttpChannelObserver 
implements CustomizableH
         getHttpChannel().writeHeader(httpMetadata);
         headerSent = true;
     }
+
+    @Override
+    public void close() throws Exception {
+        closed();
+    }
+
+    protected final void closed() {
+        closed = true;
+    }
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpChannelObserver.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpChannelObserver.java
index 4cb0342642..294d7311bf 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpChannelObserver.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpChannelObserver.java
@@ -18,7 +18,7 @@ package org.apache.dubbo.remoting.http12;
 
 import org.apache.dubbo.common.stream.StreamObserver;
 
-public interface HttpChannelObserver<T> extends StreamObserver<T> {
+public interface HttpChannelObserver<T> extends StreamObserver<T>, 
AutoCloseable {
 
     HttpChannel getHttpChannel();
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpTransportListener.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpTransportListener.java
index 9265a3ba93..57e1419eb0 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpTransportListener.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpTransportListener.java
@@ -16,7 +16,8 @@
  */
 package org.apache.dubbo.remoting.http12;
 
-public interface HttpTransportListener<HEADER extends HttpMetadata, MESSAGE 
extends HttpInputMessage> {
+public interface HttpTransportListener<HEADER extends HttpMetadata, MESSAGE 
extends HttpInputMessage>
+        extends AutoCloseable {
 
     void onMetadata(HEADER metadata);
 
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpUtils.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpUtils.java
index a25c107669..23c53f8a3f 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpUtils.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpUtils.java
@@ -185,14 +185,14 @@ public final class HttpUtils {
     }
 
     public static HttpRequest.FileUpload readUpload(InterfaceHttpData item) {
-        return new DefaultFileUploadAdaptee((FileUpload) item);
+        return new DefaultFileUploadAdapter((FileUpload) item);
     }
 
-    private static class DefaultFileUploadAdaptee implements 
HttpRequest.FileUpload {
+    private static class DefaultFileUploadAdapter implements 
HttpRequest.FileUpload {
         private final FileUpload fu;
         private InputStream inputStream;
 
-        DefaultFileUploadAdaptee(FileUpload fu) {
+        DefaultFileUploadAdapter(FileUpload fu) {
             this.fu = fu;
         }
 
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/LimitedByteBufOutputStream.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/LimitedByteBufOutputStream.java
new file mode 100644
index 0000000000..ac70f88f42
--- /dev/null
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/LimitedByteBufOutputStream.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.http12;
+
+import org.apache.dubbo.remoting.http12.exception.HttpOverPayloadException;
+
+import java.io.IOException;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+
+public class LimitedByteBufOutputStream extends ByteBufOutputStream {
+
+    private final int capacity;
+
+    public LimitedByteBufOutputStream(ByteBuf byteBuf, int capacity) {
+        super(byteBuf);
+        this.capacity = capacity == 0 ? Integer.MAX_VALUE : capacity;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        ensureCapacity(1);
+        super.write(b);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        ensureCapacity(b.length);
+        super.write(b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        ensureCapacity(len);
+        super.write(b, off, len);
+    }
+
+    private void ensureCapacity(int len) {
+        if (writtenBytes() + len > capacity) {
+            throw new HttpOverPayloadException("Response Entity Too Large");
+        }
+    }
+}
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/exception/HttpOverPayloadException.java
similarity index 79%
copy from 
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
copy to 
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/exception/HttpOverPayloadException.java
index 09ad7fe422..c98d51ba4b 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/exception/HttpOverPayloadException.java
@@ -14,9 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dubbo.remoting.http12.h2;
+package org.apache.dubbo.remoting.http12.exception;
 
-public interface Http2TransportListener extends 
CancelableTransportListener<Http2Header, Http2InputMessage> {
+public class HttpOverPayloadException extends HttpStatusException {
 
-    void onStreamClosed();
+    public HttpOverPayloadException(String message) {
+        super(500, message);
+    }
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
index 61568cc5bb..bd43f2ad92 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
@@ -37,8 +37,6 @@ public class Http2ServerChannelObserver extends 
AbstractServerHttpChannelObserve
 
     private boolean autoRequestN = true;
 
-    private boolean closed = false;
-
     public Http2ServerChannelObserver(H2StreamChannel h2StreamChannel) {
         super(h2StreamChannel);
     }
@@ -78,7 +76,7 @@ public class Http2ServerChannelObserver extends 
AbstractServerHttpChannelObserve
     public void cancel(Throwable throwable) {
         if (throwable instanceof CancelStreamException) {
             if (((CancelStreamException) throwable).isCancelByRemote()) {
-                closed = true;
+                closed();
             }
         }
         this.cancellationContext.cancel(throwable);
@@ -89,22 +87,6 @@ public class Http2ServerChannelObserver extends 
AbstractServerHttpChannelObserve
         getHttpChannel().writeResetFrame(errorCode);
     }
 
-    @Override
-    public void doOnNext(Object data) throws Throwable {
-        if (closed) {
-            return;
-        }
-        super.doOnNext(data);
-    }
-
-    @Override
-    public void doOnError(Throwable throwable) throws Throwable {
-        if (closed) {
-            return;
-        }
-        super.doOnError(throwable);
-    }
-
     @Override
     public void request(int count) {
         this.streamingDecoder.request(count);
@@ -120,8 +102,9 @@ public class Http2ServerChannelObserver extends 
AbstractServerHttpChannelObserve
         return autoRequestN;
     }
 
-    public void onStreamClosed() {
-        closed = true;
+    @Override
+    public void close() throws Exception {
+        super.close();
         streamingDecoder.onStreamClosed();
     }
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
index 09ad7fe422..16531e7a93 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
@@ -16,7 +16,4 @@
  */
 package org.apache.dubbo.remoting.http12.h2;
 
-public interface Http2TransportListener extends 
CancelableTransportListener<Http2Header, Http2InputMessage> {
-
-    void onStreamClosed();
-}
+public interface Http2TransportListener extends 
CancelableTransportListener<Http2Header, Http2InputMessage> {}
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/BinaryCodec.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/BinaryCodec.java
index bb3ee1af5c..b939927764 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/BinaryCodec.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/BinaryCodec.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.remoting.http12.message.codec;
 import org.apache.dubbo.common.io.StreamUtils;
 import org.apache.dubbo.remoting.http12.exception.DecodeException;
 import org.apache.dubbo.remoting.http12.exception.EncodeException;
+import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
 import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
 import org.apache.dubbo.remoting.http12.message.MediaType;
 
@@ -49,6 +50,8 @@ public class BinaryCodec implements HttpMessageCodec {
     public Object decode(InputStream is, Class<?> targetType, Charset charset) 
throws DecodeException {
         try {
             return StreamUtils.readBytes(is);
+        } catch (HttpStatusException e) {
+            throw e;
         } catch (Exception e) {
             throw new DecodeException(e);
         }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/HtmlCodec.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/HtmlCodec.java
index ac1ae805ab..ca3fff4f10 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/HtmlCodec.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/HtmlCodec.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.remoting.http12.message.codec;
 import org.apache.dubbo.common.io.StreamUtils;
 import org.apache.dubbo.remoting.http12.exception.DecodeException;
 import org.apache.dubbo.remoting.http12.exception.EncodeException;
+import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
 import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
 import org.apache.dubbo.remoting.http12.message.MediaType;
 
@@ -48,6 +49,8 @@ public class HtmlCodec implements HttpMessageCodec {
             if (targetType == String.class) {
                 return StreamUtils.toString(is, charset);
             }
+        } catch (HttpStatusException e) {
+            throw e;
         } catch (Exception e) {
             throw new DecodeException(e);
         }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonCodec.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonCodec.java
index aea3fdb66a..c16dbe2b24 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonCodec.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonCodec.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.io.StreamUtils;
 import org.apache.dubbo.common.utils.JsonUtils;
 import org.apache.dubbo.remoting.http12.exception.DecodeException;
 import org.apache.dubbo.remoting.http12.exception.EncodeException;
+import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
 import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
 import org.apache.dubbo.remoting.http12.message.MediaType;
 
@@ -35,6 +36,8 @@ public class JsonCodec implements HttpMessageCodec {
     public void encode(OutputStream os, Object data, Charset charset) throws 
EncodeException {
         try {
             os.write(JsonUtils.toJson(data).getBytes(charset));
+        } catch (HttpStatusException e) {
+            throw e;
         } catch (Throwable t) {
             throw new EncodeException("Error encoding json", t);
         }
@@ -43,6 +46,8 @@ public class JsonCodec implements HttpMessageCodec {
     public void encode(OutputStream os, Object[] data, Charset charset) throws 
EncodeException {
         try {
             os.write(JsonUtils.toJson(data).getBytes(charset));
+        } catch (HttpStatusException e) {
+            throw e;
         } catch (Throwable t) {
             throw new EncodeException("Error encoding json", t);
         }
@@ -52,6 +57,8 @@ public class JsonCodec implements HttpMessageCodec {
     public Object decode(InputStream is, Class<?> targetType, Charset charset) 
throws DecodeException {
         try {
             return JsonUtils.toJavaObject(StreamUtils.toString(is, charset), 
targetType);
+        } catch (HttpStatusException e) {
+            throw e;
         } catch (Throwable t) {
             throw new DecodeException("Error decoding json", t);
         }
@@ -78,7 +85,7 @@ public class JsonCodec implements HttpMessageCodec {
                 return new Object[] {JsonUtils.convertObject(obj, 
targetTypes[0])};
             }
             throw new DecodeException("Json must be array");
-        } catch (DecodeException e) {
+        } catch (HttpStatusException e) {
             throw e;
         } catch (Throwable t) {
             throw new DecodeException("Error decoding json", t);
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonPbCodec.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonPbCodec.java
index 86131abb99..a3b2dbdea4 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonPbCodec.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/JsonPbCodec.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.io.StreamUtils;
 import org.apache.dubbo.common.utils.MethodUtils;
 import org.apache.dubbo.remoting.http12.exception.DecodeException;
 import org.apache.dubbo.remoting.http12.exception.EncodeException;
+import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -54,6 +55,8 @@ public final class JsonPbCodec extends JsonCodec {
                 
JsonFormat.parser().ignoringUnknownFields().merge(StreamUtils.toString(is, 
charset), newBuilder);
                 return newBuilder.build();
             }
+        } catch (HttpStatusException e) {
+            throw e;
         } catch (Throwable e) {
             throw new DecodeException("Error decoding jsonPb", e);
         }
@@ -67,6 +70,8 @@ public final class JsonPbCodec extends JsonCodec {
                 // protobuf only support one parameter
                 return new Object[] {decode(is, targetTypes[0], charset)};
             }
+        } catch (HttpStatusException e) {
+            throw e;
         } catch (Throwable e) {
             throw new DecodeException("Error decoding jsonPb", e);
         }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/PlainTextCodec.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/PlainTextCodec.java
index 33d067686e..48e41c657d 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/PlainTextCodec.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/PlainTextCodec.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.remoting.http12.message.codec;
 import org.apache.dubbo.common.io.StreamUtils;
 import org.apache.dubbo.remoting.http12.exception.DecodeException;
 import org.apache.dubbo.remoting.http12.exception.EncodeException;
+import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
 import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
 import org.apache.dubbo.remoting.http12.message.MediaType;
 
@@ -51,6 +52,8 @@ public final class PlainTextCodec implements HttpMessageCodec 
{
             if (targetType == String.class) {
                 return StreamUtils.toString(is, charset);
             }
+        } catch (HttpStatusException e) {
+            throw e;
         } catch (Exception e) {
             throw new DecodeException(e);
         }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/UrlEncodeFormCodec.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/UrlEncodeFormCodec.java
index bd51a2c91e..643bc6211e 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/UrlEncodeFormCodec.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/UrlEncodeFormCodec.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.convert.ConverterUtil;
 import org.apache.dubbo.common.io.StreamUtils;
 import org.apache.dubbo.remoting.http12.exception.DecodeException;
 import org.apache.dubbo.remoting.http12.exception.EncodeException;
+import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
 import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
 import org.apache.dubbo.remoting.http12.message.MediaType;
 
@@ -63,6 +64,8 @@ public class UrlEncodeFormCodec implements HttpMessageCodec {
             } else {
                 throw new EncodeException("UrlEncodeFrom media-type only 
supports String or Map as return type.");
             }
+        } catch (HttpStatusException e) {
+            throw e;
         } catch (Exception e) {
             throw new EncodeException(e);
         }
@@ -99,6 +102,8 @@ public class UrlEncodeFormCodec implements HttpMessageCodec {
             } else {
                 return res.values().toArray();
             }
+        } catch (HttpStatusException e) {
+            throw e;
         } catch (Exception e) {
             throw new DecodeException(e);
         }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/XmlCodec.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/XmlCodec.java
index d63728083a..3627aea6e4 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/XmlCodec.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/XmlCodec.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.remoting.http12.message.codec;
 
 import org.apache.dubbo.remoting.http12.exception.DecodeException;
 import org.apache.dubbo.remoting.http12.exception.EncodeException;
+import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
 import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
 import org.apache.dubbo.remoting.http12.message.MediaType;
 
@@ -47,6 +48,8 @@ public class XmlCodec implements HttpMessageCodec {
             try (OutputStreamWriter writer = new OutputStreamWriter(os, 
charset)) {
                 marshaller.marshal(data, writer);
             }
+        } catch (HttpStatusException e) {
+            throw e;
         } catch (Exception e) {
             throw new EncodeException("Error encoding xml", e);
         }
@@ -63,6 +66,8 @@ public class XmlCodec implements HttpMessageCodec {
                 Unmarshaller unmarshaller = context.createUnmarshaller();
                 return unmarshaller.unmarshal(xmlSource);
             }
+        } catch (HttpStatusException e) {
+            throw e;
         } catch (Exception e) {
             throw new DecodeException("Error decoding xml", e);
         }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/YamlCodec.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/YamlCodec.java
index 30380eb159..9362bcaf8b 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/YamlCodec.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/codec/YamlCodec.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.utils.ClassUtils;
 import org.apache.dubbo.common.utils.DefaultSerializeClassChecker;
 import org.apache.dubbo.remoting.http12.exception.DecodeException;
 import org.apache.dubbo.remoting.http12.exception.EncodeException;
+import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
 import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
 import org.apache.dubbo.remoting.http12.message.MediaType;
 
@@ -43,6 +44,8 @@ public class YamlCodec implements HttpMessageCodec {
     public Object decode(InputStream is, Class<?> targetType, Charset charset) 
throws DecodeException {
         try (InputStreamReader reader = new InputStreamReader(is, charset)) {
             return createYaml().loadAs(reader, (Class) targetType);
+        } catch (HttpStatusException e) {
+            throw e;
         } catch (Throwable t) {
             throw new DecodeException("Error decoding yaml", t);
         }
@@ -69,6 +72,8 @@ public class YamlCodec implements HttpMessageCodec {
                 }
             }
             return results;
+        } catch (HttpStatusException e) {
+            throw e;
         } catch (Throwable t) {
             throw new DecodeException("Error decoding yaml", t);
         }
@@ -78,6 +83,8 @@ public class YamlCodec implements HttpMessageCodec {
     public void encode(OutputStream os, Object data, Charset charset) throws 
EncodeException {
         try (OutputStreamWriter writer = new OutputStreamWriter(os, charset)) {
             createYaml().dump(data, writer);
+        } catch (HttpStatusException e) {
+            throw e;
         } catch (Throwable t) {
             throw new EncodeException("Error encoding yaml", t);
         }
@@ -87,6 +94,8 @@ public class YamlCodec implements HttpMessageCodec {
     public void encode(OutputStream os, Object[] data, Charset charset) throws 
EncodeException {
         try (OutputStreamWriter writer = new OutputStreamWriter(os, charset)) {
             createYaml().dump(data, writer);
+        } catch (HttpStatusException e) {
+            throw e;
         } catch (Throwable t) {
             throw new EncodeException("Error encoding yaml", t);
         }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java
index 443bd21aec..086dde1afd 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1Channel.java
@@ -16,24 +16,28 @@
  */
 package org.apache.dubbo.remoting.http12.netty4.h1;
 
+import org.apache.dubbo.config.nested.TripleConfig;
 import org.apache.dubbo.remoting.http12.HttpChannel;
 import org.apache.dubbo.remoting.http12.HttpMetadata;
 import org.apache.dubbo.remoting.http12.HttpOutputMessage;
+import org.apache.dubbo.remoting.http12.LimitedByteBufOutputStream;
 import org.apache.dubbo.remoting.http12.h1.Http1OutputMessage;
 import org.apache.dubbo.remoting.http12.netty4.NettyHttpChannelFutureListener;
 
 import java.net.SocketAddress;
 import java.util.concurrent.CompletableFuture;
 
-import io.netty.buffer.ByteBufOutputStream;
 import io.netty.channel.Channel;
 
 public class NettyHttp1Channel implements HttpChannel {
 
     private final Channel channel;
 
-    public NettyHttp1Channel(Channel channel) {
+    private final TripleConfig tripleConfig;
+
+    public NettyHttp1Channel(Channel channel, TripleConfig tripleConfig) {
         this.channel = channel;
+        this.tripleConfig = tripleConfig;
     }
 
     @Override
@@ -52,7 +56,8 @@ public class NettyHttp1Channel implements HttpChannel {
 
     @Override
     public HttpOutputMessage newOutputMessage() {
-        return new Http1OutputMessage(new 
ByteBufOutputStream(channel.alloc().buffer()));
+        return new Http1OutputMessage(
+                new LimitedByteBufOutputStream(channel.alloc().buffer(), 
tripleConfig.getMaxResponseBodySize()));
     }
 
     @Override
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1ConnectionHandler.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1ConnectionHandler.java
index 7380024f7c..452faba1b2 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1ConnectionHandler.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h1/NettyHttp1ConnectionHandler.java
@@ -17,6 +17,7 @@
 package org.apache.dubbo.remoting.http12.netty4.h1;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.config.nested.TripleConfig;
 import org.apache.dubbo.remoting.http12.h1.Http1Request;
 import org.apache.dubbo.remoting.http12.h1.Http1ServerTransportListener;
 import org.apache.dubbo.remoting.http12.h1.Http1ServerTransportListenerFactory;
@@ -33,12 +34,16 @@ public class NettyHttp1ConnectionHandler extends 
SimpleChannelInboundHandler<Htt
 
     private final Http1ServerTransportListenerFactory 
http1ServerTransportListenerFactory;
 
+    private final TripleConfig tripleConfig;
+
     public NettyHttp1ConnectionHandler(
             URL url,
             FrameworkModel frameworkModel,
+            TripleConfig tripleConfig,
             Http1ServerTransportListenerFactory 
http1ServerTransportListenerFactory) {
         this.url = url;
         this.frameworkModel = frameworkModel;
+        this.tripleConfig = tripleConfig;
         this.http1ServerTransportListenerFactory = 
http1ServerTransportListenerFactory;
     }
 
@@ -47,8 +52,9 @@ public class NettyHttp1ConnectionHandler extends 
SimpleChannelInboundHandler<Htt
      */
     protected void channelRead0(ChannelHandlerContext ctx, Http1Request 
http1Request) {
         Http1ServerTransportListener http1TransportListener = 
http1ServerTransportListenerFactory.newInstance(
-                new NettyHttp1Channel(ctx.channel()), url, frameworkModel);
+                new NettyHttp1Channel(ctx.channel(), tripleConfig), url, 
frameworkModel);
         http1TransportListener.onMetadata(http1Request);
         http1TransportListener.onData(http1Request);
+        ctx.channel().closeFuture().addListener(future -> 
http1TransportListener.close());
     }
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyH2StreamChannel.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyH2StreamChannel.java
index f85b127634..edebd985cc 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyH2StreamChannel.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyH2StreamChannel.java
@@ -16,8 +16,10 @@
  */
 package org.apache.dubbo.remoting.http12.netty4.h2;
 
+import org.apache.dubbo.config.nested.TripleConfig;
 import org.apache.dubbo.remoting.http12.HttpMetadata;
 import org.apache.dubbo.remoting.http12.HttpOutputMessage;
+import org.apache.dubbo.remoting.http12.LimitedByteBufOutputStream;
 import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
 import org.apache.dubbo.remoting.http12.h2.Http2OutputMessage;
 import org.apache.dubbo.remoting.http12.h2.Http2OutputMessageFrame;
@@ -35,8 +37,11 @@ public class NettyH2StreamChannel implements H2StreamChannel 
{
 
     private final Http2StreamChannel http2StreamChannel;
 
-    public NettyH2StreamChannel(Http2StreamChannel http2StreamChannel) {
+    private final TripleConfig tripleConfig;
+
+    public NettyH2StreamChannel(Http2StreamChannel http2StreamChannel, 
TripleConfig tripleConfig) {
         this.http2StreamChannel = http2StreamChannel;
+        this.tripleConfig = tripleConfig;
     }
 
     @Override
@@ -57,7 +62,8 @@ public class NettyH2StreamChannel implements H2StreamChannel {
     @Override
     public Http2OutputMessage newOutputMessage(boolean endStream) {
         ByteBuf buffer = http2StreamChannel.alloc().buffer();
-        ByteBufOutputStream outputStream = new ByteBufOutputStream(buffer);
+        ByteBufOutputStream outputStream =
+                new LimitedByteBufOutputStream(buffer, 
tripleConfig.getMaxResponseBodySize());
         return new Http2OutputMessageFrame(outputStream, endStream);
     }
 
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2FrameCodec.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2FrameCodec.java
index 9496c94a00..ccac52f5ff 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2FrameCodec.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2FrameCodec.java
@@ -91,7 +91,7 @@ public class NettyHttp2FrameCodec extends 
ChannelDuplexHandler {
 
     private Http2HeadersFrame encodeHttp2HeadersFrame(Http2Header http2Header) 
{
         HttpHeaders headers = http2Header.headers();
-        DefaultHttp2Headers http2Headers = new DefaultHttp2Headers();
+        DefaultHttp2Headers http2Headers = new DefaultHttp2Headers(false);
         for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
             String name = entry.getKey();
             List<String> value = entry.getValue();
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
index 582b656272..26fda3e87e 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
@@ -17,6 +17,7 @@
 package org.apache.dubbo.remoting.http12.netty4.h2;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.config.nested.TripleConfig;
 import org.apache.dubbo.remoting.http12.HttpHeaderNames;
 import org.apache.dubbo.remoting.http12.HttpHeaders;
 import org.apache.dubbo.remoting.http12.HttpMetadata;
@@ -42,14 +43,18 @@ public class NettyHttp2ProtocolSelectorHandler extends 
SimpleChannelInboundHandl
 
     private final FrameworkModel frameworkModel;
 
+    private final TripleConfig tripleConfig;
+
     private final Http2ServerTransportListenerFactory 
defaultHttp2ServerTransportListenerFactory;
 
     public NettyHttp2ProtocolSelectorHandler(
             URL url,
             FrameworkModel frameworkModel,
+            TripleConfig tripleConfig,
             Http2ServerTransportListenerFactory 
defaultHttp2ServerTransportListenerFactory) {
         this.url = url;
         this.frameworkModel = frameworkModel;
+        this.tripleConfig = tripleConfig;
         this.defaultHttp2ServerTransportListenerFactory = 
defaultHttp2ServerTransportListenerFactory;
     }
 
@@ -61,7 +66,7 @@ public class NettyHttp2ProtocolSelectorHandler extends 
SimpleChannelInboundHandl
         if (factory == null) {
             throw new UnsupportedMediaTypeException(contentType);
         }
-        H2StreamChannel h2StreamChannel = new 
NettyH2StreamChannel((Http2StreamChannel) ctx.channel());
+        H2StreamChannel h2StreamChannel = new 
NettyH2StreamChannel((Http2StreamChannel) ctx.channel(), tripleConfig);
         HttpWriteQueueHandler writeQueueHandler =
                 
ctx.channel().parent().pipeline().get(HttpWriteQueueHandler.class);
         if (writeQueueHandler != null) {
@@ -70,7 +75,7 @@ public class NettyHttp2ProtocolSelectorHandler extends 
SimpleChannelInboundHandl
         }
         ChannelPipeline pipeline = ctx.pipeline();
         Http2TransportListener http2TransportListener = 
factory.newInstance(h2StreamChannel, url, frameworkModel);
-        ctx.channel().closeFuture().addListener(future -> 
http2TransportListener.onStreamClosed());
+        ctx.channel().closeFuture().addListener(future -> 
http2TransportListener.close());
         pipeline.addLast(new NettyHttp2FrameHandler(h2StreamChannel, 
http2TransportListener));
         pipeline.remove(this);
         ctx.fireChannelRead(metadata);
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
index 2c42b3f7bd..1bd9545b46 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHeaderEnum.java
@@ -45,8 +45,7 @@ public enum TripleHeaderEnum {
     SERVICE_GROUP("tri-service-group"),
     SERVICE_TIMEOUT("tri-service-timeout"),
     TRI_HEADER_CONVERT("tri-header-convert"),
-    TRI_EXCEPTION_CODE("tri-exception-code"),
-    ;
+    TRI_EXCEPTION_CODE("tri-exception-code");
 
     static final Map<String, TripleHeaderEnum> enumMap = new HashMap<>();
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index 415e8d7a31..cee75e3ec3 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -97,6 +97,7 @@ public class TripleHttp2Protocol extends AbstractWireProtocol 
implements ScopeMo
                         .maxFrameSize(tripleConfig.getMaxFrameSize())
                         
.maxHeaderListSize(tripleConfig.getMaxHeaderListSize()))
                 .frameLogger(CLIENT_LOGGER)
+                .validateHeaders(false)
                 .build();
         //        
codec.connection().local().flowController().frameWriter(codec.encoder().frameWriter());
         List<ChannelHandler> handlers = new ArrayList<>();
@@ -142,11 +143,11 @@ public class TripleHttp2Protocol extends 
AbstractWireProtocol implements ScopeMo
                 protocol -> {
                     if 
(AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, 
protocol)) {
                         return new Http2ServerUpgradeCodec(
-                                buildHttp2FrameCodec(url),
+                                buildHttp2FrameCodec(tripleConfig),
                                 new HttpWriteQueueHandler(),
                                 new FlushConsolidationHandler(64, true),
                                 new TripleServerConnectionHandler(),
-                                buildHttp2MultiplexHandler(url),
+                                buildHttp2MultiplexHandler(url, tripleConfig),
                                 new TripleTailHandler());
                     }
                     // Not upgrade request
@@ -159,24 +160,25 @@ public class TripleHttp2Protocol extends 
AbstractWireProtocol implements ScopeMo
         handlers.add(new ChannelHandlerPretender(new 
HttpObjectAggregator(tripleConfig.getMaxBodySize())));
         handlers.add(new ChannelHandlerPretender(new NettyHttp1Codec()));
         handlers.add(new ChannelHandlerPretender(new 
NettyHttp1ConnectionHandler(
-                url, frameworkModel, 
DefaultHttp11ServerTransportListenerFactory.INSTANCE)));
+                url, frameworkModel, tripleConfig, 
DefaultHttp11ServerTransportListenerFactory.INSTANCE)));
     }
 
-    private Http2MultiplexHandler buildHttp2MultiplexHandler(URL url) {
+    private Http2MultiplexHandler buildHttp2MultiplexHandler(URL url, 
TripleConfig tripleConfig) {
         return new Http2MultiplexHandler(new 
ChannelInitializer<Http2StreamChannel>() {
             @Override
             protected void initChannel(Http2StreamChannel ch) {
                 final ChannelPipeline p = ch.pipeline();
                 p.addLast(new NettyHttp2FrameCodec());
                 p.addLast(new NettyHttp2ProtocolSelectorHandler(
-                        url, frameworkModel, 
GenericHttp2ServerTransportListenerFactory.INSTANCE));
+                        url, frameworkModel, tripleConfig, 
GenericHttp2ServerTransportListenerFactory.INSTANCE));
             }
         });
     }
 
     private void configurerHttp2Handlers(URL url, List<ChannelHandler> 
handlers) {
-        final Http2FrameCodec codec = buildHttp2FrameCodec(url);
-        final Http2MultiplexHandler handler = buildHttp2MultiplexHandler(url);
+        TripleConfig tripleConfig = getTripleConfig(url);
+        final Http2FrameCodec codec = buildHttp2FrameCodec(tripleConfig);
+        final Http2MultiplexHandler handler = buildHttp2MultiplexHandler(url, 
tripleConfig);
         handlers.add(new ChannelHandlerPretender(new HttpWriteQueueHandler()));
         handlers.add(new ChannelHandlerPretender(codec));
         handlers.add(new ChannelHandlerPretender(new 
FlushConsolidationHandler(64, true)));
@@ -185,8 +187,7 @@ public class TripleHttp2Protocol extends 
AbstractWireProtocol implements ScopeMo
         handlers.add(new ChannelHandlerPretender(new TripleTailHandler()));
     }
 
-    private Http2FrameCodec buildHttp2FrameCodec(URL url) {
-        TripleConfig tripleConfig = getTripleConfig(url);
+    private Http2FrameCodec buildHttp2FrameCodec(TripleConfig tripleConfig) {
         return TripleHttp2FrameCodecBuilder.forServer()
                 .customizeConnection((connection) ->
                         connection.remote().flowController(new 
TriHttp2RemoteFlowController(connection, tripleConfig)))
@@ -198,6 +199,7 @@ public class TripleHttp2Protocol extends 
AbstractWireProtocol implements ScopeMo
                         .maxFrameSize(tripleConfig.getMaxFrameSize())
                         
.maxHeaderListSize(tripleConfig.getMaxHeaderListSize()))
                 .frameLogger(SERVER_LOGGER)
+                .validateHeaders(false)
                 .build();
     }
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
index 9ae7881302..1b25dc57a1 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
@@ -26,6 +26,7 @@ import 
org.apache.dubbo.remoting.http12.exception.DecodeException;
 import org.apache.dubbo.remoting.http12.exception.UnimplementedException;
 import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
 import org.apache.dubbo.remoting.http12.h2.Http2Header;
+import org.apache.dubbo.remoting.http12.h2.Http2ServerChannelObserver;
 import org.apache.dubbo.remoting.http12.h2.Http2TransportListener;
 import org.apache.dubbo.remoting.http12.message.MethodMetadata;
 import org.apache.dubbo.remoting.http12.message.StreamingDecoder;
@@ -75,11 +76,20 @@ public class GrpcHttp2ServerTransportListener extends 
GenericHttp2ServerTranspor
         return new GrpcStreamingDecoder();
     }
 
+    @Override
+    protected Http2ServerChannelObserver newHttp2ServerChannelObserver(
+            FrameworkModel frameworkModel, H2StreamChannel h2StreamChannel) {
+        return new GrpcServerChannelObserver(frameworkModel, h2StreamChannel);
+    }
+
     @Override
     protected HttpMessageListener buildHttpMessageListener() {
         return getContext().isHasStub() ? super.buildHttpMessageListener() : 
new LazyFindMethodListener();
     }
 
+    @Override
+    protected void onUnary() {}
+
     @Override
     protected void onMetadataCompletion(Http2Header metadata) {
         super.onMetadataCompletion(metadata);
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpChannelObserver.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcServerChannelObserver.java
similarity index 59%
copy from 
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpChannelObserver.java
copy to 
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcServerChannelObserver.java
index 4cb0342642..5b044c400f 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpChannelObserver.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcServerChannelObserver.java
@@ -14,11 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dubbo.remoting.http12;
+package org.apache.dubbo.rpc.protocol.tri.h12.grpc;
 
-import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+import 
org.apache.dubbo.rpc.protocol.tri.h12.http2.Http2ServerCallToObserverAdapter;
 
-public interface HttpChannelObserver<T> extends StreamObserver<T> {
+public class GrpcServerChannelObserver extends 
Http2ServerCallToObserverAdapter {
 
-    HttpChannel getHttpChannel();
+    public GrpcServerChannelObserver(FrameworkModel frameworkModel, 
H2StreamChannel h2StreamChannel) {
+        super(frameworkModel, h2StreamChannel);
+    }
+
+    @Override
+    protected void doOnError(Throwable throwable) {}
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
index 950a2d9350..3f9406a3cc 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
@@ -108,6 +108,11 @@ public class DefaultHttp11ServerTransportListener
         serverChannelObserver.onError(throwable);
     }
 
+    @Override
+    public void close() throws Exception {
+        serverChannelObserver.close();
+    }
+
     private static class AutoCompleteUnaryServerCallListener extends 
UnaryServerCallListener {
 
         public AutoCompleteUnaryServerCallListener(
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
index a44fefcd47..9c46944ccf 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
@@ -61,8 +61,9 @@ public class GenericHttp2ServerTransportListener extends 
AbstractServerTransport
 
     private final ExecutorSupport executorSupport;
     private final StreamingDecoder streamingDecoder;
-    private final Http2ServerChannelObserver serverChannelObserver;
-
+    private final FrameworkModel frameworkModel;
+    private final H2StreamChannel h2StreamChannel;
+    private Http2ServerChannelObserver serverChannelObserver;
     private ServerCallListener serverCallListener;
 
     public GenericHttp2ServerTransportListener(
@@ -71,15 +72,22 @@ public class GenericHttp2ServerTransportListener extends 
AbstractServerTransport
         executorSupport = 
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel())
                 .getExecutorSupport(url);
         streamingDecoder = newStreamingDecoder();
-        serverChannelObserver = new 
Http2ServerCallToObserverAdapter(frameworkModel, h2StreamChannel);
+        serverChannelObserver = newHttp2ServerChannelObserver(frameworkModel, 
h2StreamChannel);
         serverChannelObserver.setResponseEncoder(JsonCodec.INSTANCE);
         serverChannelObserver.setStreamingDecoder(streamingDecoder);
+        this.frameworkModel = frameworkModel;
+        this.h2StreamChannel = h2StreamChannel;
     }
 
     protected StreamingDecoder newStreamingDecoder() {
         return new DefaultStreamingDecoder();
     }
 
+    protected Http2ServerChannelObserver newHttp2ServerChannelObserver(
+            FrameworkModel frameworkModel, H2StreamChannel h2StreamChannel) {
+        return new Http2ServerCallToObserverAdapter(frameworkModel, 
h2StreamChannel);
+    }
+
     @Override
     protected Executor initializeExecutor(Http2Header metadata) {
         return new SerializingExecutor(executorSupport.getExecutor(metadata));
@@ -114,11 +122,9 @@ public class GenericHttp2ServerTransportListener extends 
AbstractServerTransport
 
     private ServerCallListener startListener(
             RpcInvocation invocation, MethodDescriptor methodDescriptor, 
Invoker<?> invoker) {
-        Http2ServerChannelObserver responseObserver = 
getServerChannelObserver();
-        CancellationContext cancellationContext = 
RpcContext.getCancellationContext();
-        responseObserver.setCancellationContext(cancellationContext);
         switch (methodDescriptor.getRpcType()) {
             case UNARY:
+                onUnary();
                 boolean applyCustomizeException = false;
                 if (!getContext().isHasStub()) {
                     MethodMetadata methodMetadata = 
getContext().getMethodMetadata();
@@ -127,19 +133,34 @@ public class GenericHttp2ServerTransportListener extends 
AbstractServerTransport
                             methodMetadata.getActualRequestTypes(),
                             methodMetadata.getActualResponseType());
                 }
-                UnaryServerCallListener unaryServerCallListener = 
startUnary(invocation, invoker, responseObserver);
+                onListenerStart();
+                UnaryServerCallListener unaryServerCallListener =
+                        startUnary(invocation, invoker, 
getServerChannelObserver());
                 
unaryServerCallListener.setApplyCustomizeException(applyCustomizeException);
                 return unaryServerCallListener;
             case SERVER_STREAM:
-                return startServerStreaming(invocation, invoker, 
responseObserver);
+                onListenerStart();
+                return startServerStreaming(invocation, invoker, 
getServerChannelObserver());
             case BI_STREAM:
             case CLIENT_STREAM:
-                return startBiStreaming(invocation, invoker, responseObserver);
+                onListenerStart();
+                return startBiStreaming(invocation, invoker, 
getServerChannelObserver());
             default:
                 throw new IllegalStateException("Can not reach here");
         }
     }
 
+    protected void onUnary() {
+        serverChannelObserver = new 
Http2ServerUnaryChannelObserver(frameworkModel, h2StreamChannel);
+        serverChannelObserver.setResponseEncoder(JsonCodec.INSTANCE);
+        serverChannelObserver.setStreamingDecoder(streamingDecoder);
+    }
+
+    protected void onListenerStart() {
+        CancellationContext cancellationContext = 
RpcContext.getCancellationContext();
+        serverChannelObserver.setCancellationContext(cancellationContext);
+    }
+
     private UnaryServerCallListener startUnary(
             RpcInvocation invocation, Invoker<?> invoker, 
Http2ServerChannelObserver responseObserver) {
         return new UnaryServerCallListener(invocation, invoker, 
responseObserver);
@@ -201,9 +222,8 @@ public class GenericHttp2ServerTransportListener extends 
AbstractServerTransport
     }
 
     @Override
-    public void onStreamClosed() {
-        // doing on event loop thread
-        getServerChannelObserver().onStreamClosed();
+    public void close() throws Exception {
+        getServerChannelObserver().close();
     }
 
     private static class Http2StreamingDecodeListener implements 
ListeningDecoder.Listener {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ServerUnaryChannelObserver.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ServerUnaryChannelObserver.java
new file mode 100644
index 0000000000..10d2c2a530
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ServerUnaryChannelObserver.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri.h12.http2;
+
+import org.apache.dubbo.remoting.http12.HttpOutputMessage;
+import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+
+public class Http2ServerUnaryChannelObserver extends 
Http2ServerCallToObserverAdapter {
+
+    public Http2ServerUnaryChannelObserver(FrameworkModel frameworkModel, 
H2StreamChannel h2StreamChannel) {
+        super(frameworkModel, h2StreamChannel);
+    }
+
+    @Override
+    public void doOnNext(Object data) throws Throwable {
+        HttpOutputMessage httpOutputMessage = buildMessage(data);
+        sendHeader(buildMetadata(resolveStatusCode(data), data, 
httpOutputMessage));
+        sendMessage(httpOutputMessage);
+    }
+
+    @Override
+    public void doOnError(Throwable throwable) throws Throwable {
+        String statusCode = resolveStatusCode(throwable);
+        Object data = buildErrorResponse(statusCode, throwable);
+        HttpOutputMessage httpOutputMessage = buildMessage(data);
+        sendHeader(buildMetadata(statusCode, data, httpOutputMessage));
+        sendMessage(httpOutputMessage);
+    }
+
+    @Override
+    protected void doOnCompleted(Throwable throwable) {}
+
+    @Override
+    protected HttpOutputMessage encodeHttpOutputMessage(Object data) {
+        return getHttpChannel().newOutputMessage(true);
+    }
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/RestHttpMessageCodec.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/RestHttpMessageCodec.java
index 23462b7b6b..448a2b7db4 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/RestHttpMessageCodec.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/RestHttpMessageCodec.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.remoting.http12.HttpRequest;
 import org.apache.dubbo.remoting.http12.HttpResponse;
 import org.apache.dubbo.remoting.http12.exception.DecodeException;
 import org.apache.dubbo.remoting.http12.exception.EncodeException;
+import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
 import org.apache.dubbo.remoting.http12.message.HttpMessageDecoder;
 import org.apache.dubbo.remoting.http12.message.HttpMessageEncoder;
 import org.apache.dubbo.remoting.http12.message.MediaType;
@@ -108,6 +109,8 @@ public final class RestHttpMessageCodec implements 
HttpMessageDecoder, HttpMessa
                 if (messageEncoder.mediaType().isPureText() && type != 
String.class) {
                     data = typeConverter.convert(data, String.class);
                 }
+            } catch (HttpStatusException e) {
+                throw e;
             } catch (Exception e) {
                 throw new EncodeException(e);
             }

Reply via email to