This is an automated email from the ASF dual-hosted git repository.

earthchen pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 2c38cab1d2 fix memory leak of tri protocol (#13973)
2c38cab1d2 is described below

commit 2c38cab1d2bb8b1c6d49fab3e93690bbde4817fb
Author: icodening <[email protected]>
AuthorDate: Tue May 14 13:58:01 2024 +0800

    fix memory leak of tri protocol (#13973)
    
    * fix memory leak of tri protocol
    
    * fix memory leak of tri protocol
    
    * fix memory leak of tri protocol
    
    * fix memory leak of tri protocol
    
    * fix memory leak of tri protocol
    
    * fix memory leak of tri protocol
    
    * fixup! fix memory leak of tri protocol
    
    * fix memory leak of tri protocol
    
    * fix memory leak of tri protocol
    
    ---------
    
    Co-authored-by: earthchen <[email protected]>
---
 ...ortListener.java => CancelStreamException.java} | 31 +++++++++++++++++++++-
 .../http12/h2/Http2ServerChannelObserver.java      | 25 ++++++++++++++++-
 .../remoting/http12/h2/Http2TransportListener.java |  5 +++-
 .../message/LengthFieldStreamingDecoder.java       |  3 +++
 .../h2/NettyHttp2ProtocolSelectorHandler.java      |  6 +++--
 .../http2/GenericHttp2ServerTransportListener.java | 10 +++++--
 6 files changed, 73 insertions(+), 7 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/CancelStreamException.java
similarity index 50%
copy from 
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
copy to 
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/CancelStreamException.java
index 16531e7a93..62fdee1e6e 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/CancelStreamException.java
@@ -16,4 +16,33 @@
  */
 package org.apache.dubbo.remoting.http12.h2;
 
-public interface Http2TransportListener extends 
CancelableTransportListener<Http2Header, Http2InputMessage> {}
+import org.apache.dubbo.remoting.http12.ErrorCodeHolder;
+
+public class CancelStreamException extends RuntimeException implements 
ErrorCodeHolder {
+
+    private final boolean cancelByRemote;
+
+    private final long errorCode;
+
+    private CancelStreamException(boolean cancelByRemote, long errorCode) {
+        this.cancelByRemote = cancelByRemote;
+        this.errorCode = errorCode;
+    }
+
+    public boolean isCancelByRemote() {
+        return cancelByRemote;
+    }
+
+    public static CancelStreamException fromRemote(long errorCode) {
+        return new CancelStreamException(true, errorCode);
+    }
+
+    public static CancelStreamException fromLocal(long errorCode) {
+        return new CancelStreamException(false, errorCode);
+    }
+
+    @Override
+    public long getErrorCode() {
+        return errorCode;
+    }
+}
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
index c9fa6af4bb..89d77965d6 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
@@ -37,6 +37,8 @@ public class Http2ServerChannelObserver extends 
AbstractServerHttpChannelObserve
 
     private boolean autoRequestN = true;
 
+    private boolean closed = false;
+
     public Http2ServerChannelObserver(H2StreamChannel h2StreamChannel) {
         super(h2StreamChannel);
     }
@@ -74,12 +76,33 @@ public class Http2ServerChannelObserver extends 
AbstractServerHttpChannelObserve
 
     @Override
     public void cancel(Throwable throwable) {
+        if (throwable instanceof CancelStreamException) {
+            if (((CancelStreamException) throwable).isCancelByRemote()) {
+                closed = true;
+            }
+        }
+        this.cancellationContext.cancel(throwable);
         long errorCode = 0;
         if (throwable instanceof ErrorCodeHolder) {
             errorCode = ((ErrorCodeHolder) throwable).getErrorCode();
         }
         getHttpChannel().writeResetFrame(errorCode);
-        this.cancellationContext.cancel(throwable);
+    }
+
+    @Override
+    public void doOnNext(Object data) throws Throwable {
+        if (closed) {
+            return;
+        }
+        super.doOnNext(data);
+    }
+
+    @Override
+    public void doOnError(Throwable throwable) throws Throwable {
+        if (closed) {
+            return;
+        }
+        super.doOnError(throwable);
     }
 
     @Override
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
index 16531e7a93..09ad7fe422 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
@@ -16,4 +16,7 @@
  */
 package org.apache.dubbo.remoting.http12.h2;
 
-public interface Http2TransportListener extends 
CancelableTransportListener<Http2Header, Http2InputMessage> {}
+public interface Http2TransportListener extends 
CancelableTransportListener<Http2Header, Http2InputMessage> {
+
+    void onStreamClosed();
+}
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
index c401cfeacd..4e0a087ccd 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
@@ -95,6 +95,9 @@ public class LengthFieldStreamingDecoder implements 
StreamingDecoder {
         if (inDelivery) {
             return;
         }
+        if (closed) {
+            return;
+        }
         inDelivery = true;
         try {
             // Process the uncompressed bytes.
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
index cc82019a7e..582b656272 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
@@ -24,6 +24,7 @@ import 
org.apache.dubbo.remoting.http12.command.HttpWriteQueue;
 import 
org.apache.dubbo.remoting.http12.exception.UnsupportedMediaTypeException;
 import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
 import org.apache.dubbo.remoting.http12.h2.Http2ServerTransportListenerFactory;
+import org.apache.dubbo.remoting.http12.h2.Http2TransportListener;
 import org.apache.dubbo.remoting.http12.h2.command.Http2WriteQueueChannel;
 import org.apache.dubbo.remoting.http12.netty4.HttpWriteQueueHandler;
 import org.apache.dubbo.rpc.model.FrameworkModel;
@@ -68,8 +69,9 @@ public class NettyHttp2ProtocolSelectorHandler extends 
SimpleChannelInboundHandl
             h2StreamChannel = new Http2WriteQueueChannel(h2StreamChannel, 
writeQueue);
         }
         ChannelPipeline pipeline = ctx.pipeline();
-        pipeline.addLast(
-                new NettyHttp2FrameHandler(h2StreamChannel, 
factory.newInstance(h2StreamChannel, url, frameworkModel)));
+        Http2TransportListener http2TransportListener = 
factory.newInstance(h2StreamChannel, url, frameworkModel);
+        ctx.channel().closeFuture().addListener(future -> 
http2TransportListener.onStreamClosed());
+        pipeline.addLast(new NettyHttp2FrameHandler(h2StreamChannel, 
http2TransportListener));
         pipeline.remove(this);
         ctx.fireChannelRead(metadata);
     }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
index 4fee48dfb4..4821563a09 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
@@ -20,7 +20,7 @@ import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
 import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
 import org.apache.dubbo.remoting.http12.HttpMethods;
-import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
+import org.apache.dubbo.remoting.http12.h2.CancelStreamException;
 import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
 import org.apache.dubbo.remoting.http12.h2.Http2Header;
 import org.apache.dubbo.remoting.http12.h2.Http2InputMessage;
@@ -176,7 +176,7 @@ public class GenericHttp2ServerTransportListener extends 
AbstractServerTransport
 
     @Override
     public void cancelByRemote(long errorCode) {
-        serverChannelObserver.cancel(new HttpStatusException((int) errorCode));
+        
serverChannelObserver.cancel(CancelStreamException.fromRemote(errorCode));
         serverCallListener.onCancel(errorCode);
     }
 
@@ -188,6 +188,12 @@ public class GenericHttp2ServerTransportListener extends 
AbstractServerTransport
         return serverChannelObserver;
     }
 
+    @Override
+    public void onStreamClosed() {
+        // doing on event loop thread
+        getStreamingDecoder().close();
+    }
+
     private static class Http2StreamingDecodeListener implements 
ListeningDecoder.Listener {
 
         private final ServerCallListener serverCallListener;

Reply via email to