This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 2c38cab1d2 fix memory leak of tri protocol (#13973)
2c38cab1d2 is described below
commit 2c38cab1d2bb8b1c6d49fab3e93690bbde4817fb
Author: icodening <[email protected]>
AuthorDate: Tue May 14 13:58:01 2024 +0800
fix memory leak of tri protocol (#13973)
* fix memory leak of tri protocol
* fix memory leak of tri protocol
* fix memory leak of tri protocol
* fix memory leak of tri protocol
* fix memory leak of tri protocol
* fix memory leak of tri protocol
* fixup! fix memory leak of tri protocol
* fix memory leak of tri protocol
* fix memory leak of tri protocol
---------
Co-authored-by: earthchen <[email protected]>
---
...ortListener.java => CancelStreamException.java} | 31 +++++++++++++++++++++-
.../http12/h2/Http2ServerChannelObserver.java | 25 ++++++++++++++++-
.../remoting/http12/h2/Http2TransportListener.java | 5 +++-
.../message/LengthFieldStreamingDecoder.java | 3 +++
.../h2/NettyHttp2ProtocolSelectorHandler.java | 6 +++--
.../http2/GenericHttp2ServerTransportListener.java | 10 +++++--
6 files changed, 73 insertions(+), 7 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/CancelStreamException.java
similarity index 50%
copy from
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
copy to
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/CancelStreamException.java
index 16531e7a93..62fdee1e6e 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/CancelStreamException.java
@@ -16,4 +16,33 @@
*/
package org.apache.dubbo.remoting.http12.h2;
-public interface Http2TransportListener extends
CancelableTransportListener<Http2Header, Http2InputMessage> {}
+import org.apache.dubbo.remoting.http12.ErrorCodeHolder;
+
+public class CancelStreamException extends RuntimeException implements
ErrorCodeHolder {
+
+ private final boolean cancelByRemote;
+
+ private final long errorCode;
+
+ private CancelStreamException(boolean cancelByRemote, long errorCode) {
+ this.cancelByRemote = cancelByRemote;
+ this.errorCode = errorCode;
+ }
+
+ public boolean isCancelByRemote() {
+ return cancelByRemote;
+ }
+
+ public static CancelStreamException fromRemote(long errorCode) {
+ return new CancelStreamException(true, errorCode);
+ }
+
+ public static CancelStreamException fromLocal(long errorCode) {
+ return new CancelStreamException(false, errorCode);
+ }
+
+ @Override
+ public long getErrorCode() {
+ return errorCode;
+ }
+}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
index c9fa6af4bb..89d77965d6 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
@@ -37,6 +37,8 @@ public class Http2ServerChannelObserver extends
AbstractServerHttpChannelObserve
private boolean autoRequestN = true;
+ private boolean closed = false;
+
public Http2ServerChannelObserver(H2StreamChannel h2StreamChannel) {
super(h2StreamChannel);
}
@@ -74,12 +76,33 @@ public class Http2ServerChannelObserver extends
AbstractServerHttpChannelObserve
@Override
public void cancel(Throwable throwable) {
+ if (throwable instanceof CancelStreamException) {
+ if (((CancelStreamException) throwable).isCancelByRemote()) {
+ closed = true;
+ }
+ }
+ this.cancellationContext.cancel(throwable);
long errorCode = 0;
if (throwable instanceof ErrorCodeHolder) {
errorCode = ((ErrorCodeHolder) throwable).getErrorCode();
}
getHttpChannel().writeResetFrame(errorCode);
- this.cancellationContext.cancel(throwable);
+ }
+
+ @Override
+ public void doOnNext(Object data) throws Throwable {
+ if (closed) {
+ return;
+ }
+ super.doOnNext(data);
+ }
+
+ @Override
+ public void doOnError(Throwable throwable) throws Throwable {
+ if (closed) {
+ return;
+ }
+ super.doOnError(throwable);
}
@Override
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
index 16531e7a93..09ad7fe422 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java
@@ -16,4 +16,7 @@
*/
package org.apache.dubbo.remoting.http12.h2;
-public interface Http2TransportListener extends
CancelableTransportListener<Http2Header, Http2InputMessage> {}
+public interface Http2TransportListener extends
CancelableTransportListener<Http2Header, Http2InputMessage> {
+
+ void onStreamClosed();
+}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
index c401cfeacd..4e0a087ccd 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
@@ -95,6 +95,9 @@ public class LengthFieldStreamingDecoder implements
StreamingDecoder {
if (inDelivery) {
return;
}
+ if (closed) {
+ return;
+ }
inDelivery = true;
try {
// Process the uncompressed bytes.
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
index cc82019a7e..582b656272 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java
@@ -24,6 +24,7 @@ import
org.apache.dubbo.remoting.http12.command.HttpWriteQueue;
import
org.apache.dubbo.remoting.http12.exception.UnsupportedMediaTypeException;
import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
import org.apache.dubbo.remoting.http12.h2.Http2ServerTransportListenerFactory;
+import org.apache.dubbo.remoting.http12.h2.Http2TransportListener;
import org.apache.dubbo.remoting.http12.h2.command.Http2WriteQueueChannel;
import org.apache.dubbo.remoting.http12.netty4.HttpWriteQueueHandler;
import org.apache.dubbo.rpc.model.FrameworkModel;
@@ -68,8 +69,9 @@ public class NettyHttp2ProtocolSelectorHandler extends
SimpleChannelInboundHandl
h2StreamChannel = new Http2WriteQueueChannel(h2StreamChannel,
writeQueue);
}
ChannelPipeline pipeline = ctx.pipeline();
- pipeline.addLast(
- new NettyHttp2FrameHandler(h2StreamChannel,
factory.newInstance(h2StreamChannel, url, frameworkModel)));
+ Http2TransportListener http2TransportListener =
factory.newInstance(h2StreamChannel, url, frameworkModel);
+ ctx.channel().closeFuture().addListener(future ->
http2TransportListener.onStreamClosed());
+ pipeline.addLast(new NettyHttp2FrameHandler(h2StreamChannel,
http2TransportListener));
pipeline.remove(this);
ctx.fireChannelRead(metadata);
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
index 4fee48dfb4..4821563a09 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
@@ -20,7 +20,7 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
import org.apache.dubbo.remoting.http12.HttpMethods;
-import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
+import org.apache.dubbo.remoting.http12.h2.CancelStreamException;
import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
import org.apache.dubbo.remoting.http12.h2.Http2Header;
import org.apache.dubbo.remoting.http12.h2.Http2InputMessage;
@@ -176,7 +176,7 @@ public class GenericHttp2ServerTransportListener extends
AbstractServerTransport
@Override
public void cancelByRemote(long errorCode) {
- serverChannelObserver.cancel(new HttpStatusException((int) errorCode));
+
serverChannelObserver.cancel(CancelStreamException.fromRemote(errorCode));
serverCallListener.onCancel(errorCode);
}
@@ -188,6 +188,12 @@ public class GenericHttp2ServerTransportListener extends
AbstractServerTransport
return serverChannelObserver;
}
+ @Override
+ public void onStreamClosed() {
+ // doing on event loop thread
+ getStreamingDecoder().close();
+ }
+
private static class Http2StreamingDecodeListener implements
ListeningDecoder.Listener {
private final ServerCallListener serverCallListener;