This is an automated email from the ASF dual-hosted git repository.

earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 3ca35045a0 Release ByteBuf when handle onData failed (#13102)
3ca35045a0 is described below

commit 3ca35045a091c71a051914dadf82e72628009a16
Author: TomlongTK <[email protected]>
AuthorDate: Tue Oct 10 10:28:15 2023 +0800

    Release ByteBuf when handle onData failed (#13102)
    
    * Release ByteBuf when handle onData failed
    
    * Runnable with clean function
    
    * Fix license
    
    * Release ByteBuf in onData method, and refactor some sonar issue.
    
    * Remove unused dependency
    
    ---------
    
    Co-authored-by: earthchen <[email protected]>
    Co-authored-by: Albumen Kevin <[email protected]>
---
 .../protocol/tri/stream/TripleClientStream.java    | 46 +++++++++++++---------
 .../protocol/tri/stream/TripleServerStream.java    | 31 +++++++++------
 2 files changed, 46 insertions(+), 31 deletions(-)

diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
index 99f993566d..da7ddc4543 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
@@ -237,9 +237,7 @@ public class TripleClientStream extends AbstractStream 
implements ClientStream {
 
         void finishProcess(TriRpcStatus status, Http2Headers trailers, boolean 
isReturnTriException) {
             final Map<String, String> reserved = 
filterReservedHeaders(trailers);
-            final Map<String, Object> attachments = headersToMap(trailers, () 
-> {
-                return 
reserved.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader());
-            });
+            final Map<String, Object> attachments = headersToMap(trailers, () 
-> reserved.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader()));
             final TriRpcStatus detailStatus;
             final TriRpcStatus statusFromTrailers = 
getStatusFromTrailers(reserved);
             if (statusFromTrailers != null) {
@@ -449,23 +447,33 @@ public class TripleClientStream extends AbstractStream 
implements ClientStream {
 
         @Override
         public void onData(ByteBuf data, boolean endStream) {
-            executor.execute(() -> {
-                if (transportError != null) {
-                    transportError.appendDescription(
-                        "Data:" + data.toString(StandardCharsets.UTF_8));
-                    ReferenceCountUtil.release(data);
-                    if (transportError.description.length() > 512 || 
endStream) {
-                        handleH2TransportError(transportError);
-                    }
-                    return;
-                }
-                if (!headerReceived) {
-                    
handleH2TransportError(TriRpcStatus.INTERNAL.withDescription(
-                        "headers not received before payload"));
-                    return;
+            try {
+                executor.execute(() -> doOnData(data, endStream));
+            } catch (Throwable t) {
+                // Tasks will be rejected when the thread pool is closed or 
full,
+                // ByteBuf needs to be released to avoid out of heap memory 
leakage.
+                // For example, ThreadLessExecutor will be shutdown when 
request timeout {@link AsyncRpcResult}
+                ReferenceCountUtil.release(data);
+                LOGGER.error(PROTOCOL_FAILED_RESPONSE, "", "", "submit onData 
task failed", t);
+            }
+        }
+
+        private void doOnData(ByteBuf data, boolean endStream) {
+            if (transportError != null) {
+                transportError.appendDescription(
+                    "Data:" + data.toString(StandardCharsets.UTF_8));
+                ReferenceCountUtil.release(data);
+                if (transportError.description.length() > 512 || endStream) {
+                    handleH2TransportError(transportError);
                 }
-                deframer.deframe(data);
-            });
+                return;
+            }
+            if (!headerReceived) {
+                handleH2TransportError(TriRpcStatus.INTERNAL.withDescription(
+                    "headers not received before payload"));
+                return;
+            }
+            deframer.deframe(data);
         }
 
         @Override
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
index be9bfc4951..88e5cd0b3d 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
@@ -17,9 +17,10 @@
 
 package org.apache.dubbo.rpc.protocol.tri.stream;
 
+import io.netty.util.ReferenceCountUtil;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.rpc.HeaderFilter;
@@ -69,10 +70,11 @@ import java.util.Optional;
 import java.util.concurrent.Executor;
 
 import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_REQUEST;
 
 public class TripleServerStream extends AbstractStream implements ServerStream 
{
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(TripleServerStream.class);
+    private static final ErrorTypeAwareLogger LOGGER = 
LoggerFactory.getErrorTypeAwareLogger(TripleServerStream.class);
     public final ServerTransportObserver transportObserver = new 
ServerTransportObserver();
     private final TripleWriteQueue writeQueue;
     private final PathResolver pathResolver;
@@ -408,11 +410,10 @@ public class TripleServerStream extends AbstractStream 
implements ServerStream {
                 }
             }
 
-            Map<String, Object> requestMetadata = headersToMap(headers, () -> {
-                return 
Optional.ofNullable(headers.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader()))
-                    .map(CharSequence::toString)
-                    .orElse(null);
-            });
+            Map<String, Object> requestMetadata = headersToMap(headers, () ->
+                
Optional.ofNullable(headers.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader()))
+                .map(CharSequence::toString)
+                .orElse(null));
             boolean hasStub = pathResolver.hasNativeStub(path);
             if (hasStub) {
                 listener = new StubAbstractServerCall(invoker, 
TripleServerStream.this,
@@ -431,7 +432,15 @@ public class TripleServerStream extends AbstractStream 
implements ServerStream {
 
         @Override
         public void onData(ByteBuf data, boolean endStream) {
-            executor.execute(() -> doOnData(data, endStream));
+            try {
+                executor.execute(() -> doOnData(data, endStream));
+            } catch (Throwable t) {
+                // Tasks will be rejected when the thread pool is closed or 
full,
+                // ByteBuf needs to be released to avoid out of heap memory 
leakage.
+                // For example, ThreadLessExecutor will be shutdown when 
request timeout {@link AsyncRpcResult}
+                ReferenceCountUtil.release(data);
+                LOGGER.error(PROTOCOL_FAILED_REQUEST, "", "", "submit onData 
task failed", t);
+            }
         }
 
         private void doOnData(ByteBuf data, boolean endStream) {
@@ -454,10 +463,8 @@ public class TripleServerStream extends AbstractStream 
implements ServerStream {
             if (listener == null) {
                 return;
             }
-            executor.execute(() -> {
-                listener.onCancelByRemote(TriRpcStatus.CANCELLED
-                    .withDescription("Canceled by client ,errorCode=" + 
errorCode));
-            });
+            executor.execute(() -> 
listener.onCancelByRemote(TriRpcStatus.CANCELLED
+                .withDescription("Canceled by client ,errorCode=" + 
errorCode)));
         }
     }
 

Reply via email to