This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 935849f4ce Optimize the decoding of generic http2 (#14175)
935849f4ce is described below

commit 935849f4ce0eed1893869711fe42b84d22e31c84
Author: TomlongTK <[email protected]>
AuthorDate: Thu May 23 11:17:31 2024 +0800

    Optimize the decoding of generic http2 (#14175)
    
    * Optimize the decoding of generic http2
    
    * Decode on close
    
    * Clean up netty residual memory when stream is closed
---
 ...ngDecoder.java => DefaultStreamingDecoder.java} | 37 +++++++++++++++++++---
 .../h12/grpc/GrpcHttp2ServerTransportListener.java | 14 --------
 .../http2/GenericHttp2ServerTransportListener.java | 18 +++++++++--
 3 files changed, 47 insertions(+), 22 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/NoOpStreamingDecoder.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/DefaultStreamingDecoder.java
similarity index 59%
rename from 
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/NoOpStreamingDecoder.java
rename to 
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/DefaultStreamingDecoder.java
index 473e6a2ed0..939bbefc74 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/NoOpStreamingDecoder.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/DefaultStreamingDecoder.java
@@ -16,13 +16,19 @@
  */
 package org.apache.dubbo.remoting.http12.message;
 
+import org.apache.dubbo.remoting.http12.CompositeInputStream;
 import org.apache.dubbo.remoting.http12.exception.DecodeException;
 
+import java.io.IOException;
 import java.io.InputStream;
 
-public class NoOpStreamingDecoder implements StreamingDecoder {
+public class DefaultStreamingDecoder implements StreamingDecoder {
 
-    private FragmentListener listener;
+    private boolean closed;
+
+    protected final CompositeInputStream accumulate = new 
CompositeInputStream();
+
+    protected FragmentListener listener;
 
     @Override
     public void request(int numMessages) {
@@ -31,17 +37,38 @@ public class NoOpStreamingDecoder implements 
StreamingDecoder {
 
     @Override
     public void decode(InputStream inputStream) throws DecodeException {
-        listener.onFragmentMessage(inputStream);
+        if (closed) {
+            // ignored
+            return;
+        }
+        accumulate.addInputStream(inputStream);
     }
 
     @Override
     public void close() {
-        this.listener.onClose();
+        try {
+            if (!closed) {
+                closed = true;
+                listener.onFragmentMessage(accumulate);
+                accumulate.close();
+                listener.onClose();
+            }
+        } catch (IOException e) {
+            throw new DecodeException(e);
+        }
     }
 
     @Override
     public void onStreamClosed() {
-        // do nothing
+        if (closed) {
+            return;
+        }
+        closed = true;
+        try {
+            accumulate.close();
+        } catch (IOException e) {
+            throw new DecodeException(e);
+        }
     }
 
     @Override
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
index 0399c64334..9ae7881302 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcHttp2ServerTransportListener.java
@@ -26,7 +26,6 @@ import 
org.apache.dubbo.remoting.http12.exception.DecodeException;
 import org.apache.dubbo.remoting.http12.exception.UnimplementedException;
 import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
 import org.apache.dubbo.remoting.http12.h2.Http2Header;
-import org.apache.dubbo.remoting.http12.h2.Http2InputMessage;
 import org.apache.dubbo.remoting.http12.h2.Http2TransportListener;
 import org.apache.dubbo.remoting.http12.message.MethodMetadata;
 import org.apache.dubbo.remoting.http12.message.StreamingDecoder;
@@ -123,19 +122,6 @@ public class GrpcHttp2ServerTransportListener extends 
GenericHttp2ServerTranspor
         return invocation;
     }
 
-    @Override
-    protected void onError(Http2InputMessage message, Throwable throwable) {
-        try {
-            message.close();
-        } catch (Exception e) {
-            throwable.addSuppressed(e);
-        }
-        onError(throwable);
-    }
-
-    @Override
-    protected void onFinally(Http2InputMessage message) {}
-
     @Override
     protected GrpcStreamingDecoder getStreamingDecoder() {
         return (GrpcStreamingDecoder) super.getStreamingDecoder();
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
index 5be8bfcf68..9db2971837 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
@@ -28,9 +28,9 @@ import 
org.apache.dubbo.remoting.http12.h2.Http2InputMessageFrame;
 import org.apache.dubbo.remoting.http12.h2.Http2ServerChannelObserver;
 import org.apache.dubbo.remoting.http12.h2.Http2TransportListener;
 import org.apache.dubbo.remoting.http12.message.DefaultListeningDecoder;
+import org.apache.dubbo.remoting.http12.message.DefaultStreamingDecoder;
 import org.apache.dubbo.remoting.http12.message.ListeningDecoder;
 import org.apache.dubbo.remoting.http12.message.MethodMetadata;
-import org.apache.dubbo.remoting.http12.message.NoOpStreamingDecoder;
 import org.apache.dubbo.remoting.http12.message.StreamingDecoder;
 import org.apache.dubbo.remoting.http12.message.codec.JsonCodec;
 import org.apache.dubbo.rpc.CancellationContext;
@@ -77,8 +77,7 @@ public class GenericHttp2ServerTransportListener extends 
AbstractServerTransport
     }
 
     protected StreamingDecoder newStreamingDecoder() {
-        // default no op
-        return new NoOpStreamingDecoder();
+        return new DefaultStreamingDecoder();
     }
 
     @Override
@@ -174,6 +173,19 @@ public class GenericHttp2ServerTransportListener extends 
AbstractServerTransport
         serverChannelObserver.onError(throwable);
     }
 
+    @Override
+    protected void onError(Http2InputMessage message, Throwable throwable) {
+        try {
+            message.close();
+        } catch (Exception e) {
+            throwable.addSuppressed(e);
+        }
+        onError(throwable);
+    }
+
+    @Override
+    protected void onFinally(Http2InputMessage message) {}
+
     @Override
     public void cancelByRemote(long errorCode) {
         
serverChannelObserver.cancel(CancelStreamException.fromRemote(errorCode));

Reply via email to