This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 3ca35045a0 Release ByteBuf when handle onData failed (#13102)
3ca35045a0 is described below
commit 3ca35045a091c71a051914dadf82e72628009a16
Author: TomlongTK <[email protected]>
AuthorDate: Tue Oct 10 10:28:15 2023 +0800
Release ByteBuf when handle onData failed (#13102)
* Release ByteBuf when handle onData failed
* Runnable with clean function
* Fix license
* Release ByteBuf in onData method, and refactor some sonar issue.
* Remove unused dependency
---------
Co-authored-by: earthchen <[email protected]>
Co-authored-by: Albumen Kevin <[email protected]>
---
.../protocol/tri/stream/TripleClientStream.java | 46 +++++++++++++---------
.../protocol/tri/stream/TripleServerStream.java | 31 +++++++++------
2 files changed, 46 insertions(+), 31 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
index 99f993566d..da7ddc4543 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleClientStream.java
@@ -237,9 +237,7 @@ public class TripleClientStream extends AbstractStream
implements ClientStream {
void finishProcess(TriRpcStatus status, Http2Headers trailers, boolean
isReturnTriException) {
final Map<String, String> reserved =
filterReservedHeaders(trailers);
- final Map<String, Object> attachments = headersToMap(trailers, ()
-> {
- return
reserved.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader());
- });
+ final Map<String, Object> attachments = headersToMap(trailers, ()
-> reserved.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader()));
final TriRpcStatus detailStatus;
final TriRpcStatus statusFromTrailers =
getStatusFromTrailers(reserved);
if (statusFromTrailers != null) {
@@ -449,23 +447,33 @@ public class TripleClientStream extends AbstractStream
implements ClientStream {
@Override
public void onData(ByteBuf data, boolean endStream) {
- executor.execute(() -> {
- if (transportError != null) {
- transportError.appendDescription(
- "Data:" + data.toString(StandardCharsets.UTF_8));
- ReferenceCountUtil.release(data);
- if (transportError.description.length() > 512 ||
endStream) {
- handleH2TransportError(transportError);
- }
- return;
- }
- if (!headerReceived) {
-
handleH2TransportError(TriRpcStatus.INTERNAL.withDescription(
- "headers not received before payload"));
- return;
+ try {
+ executor.execute(() -> doOnData(data, endStream));
+ } catch (Throwable t) {
+ // Tasks will be rejected when the thread pool is closed or
full,
+ // ByteBuf needs to be released to avoid out of heap memory
leakage.
+ // For example, ThreadLessExecutor will be shutdown when
request timeout {@link AsyncRpcResult}
+ ReferenceCountUtil.release(data);
+ LOGGER.error(PROTOCOL_FAILED_RESPONSE, "", "", "submit onData
task failed", t);
+ }
+ }
+
+ private void doOnData(ByteBuf data, boolean endStream) {
+ if (transportError != null) {
+ transportError.appendDescription(
+ "Data:" + data.toString(StandardCharsets.UTF_8));
+ ReferenceCountUtil.release(data);
+ if (transportError.description.length() > 512 || endStream) {
+ handleH2TransportError(transportError);
}
- deframer.deframe(data);
- });
+ return;
+ }
+ if (!headerReceived) {
+ handleH2TransportError(TriRpcStatus.INTERNAL.withDescription(
+ "headers not received before payload"));
+ return;
+ }
+ deframer.deframe(data);
}
@Override
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
index be9bfc4951..88e5cd0b3d 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
@@ -17,9 +17,10 @@
package org.apache.dubbo.rpc.protocol.tri.stream;
+import io.netty.util.ReferenceCountUtil;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.HeaderFilter;
@@ -69,10 +70,11 @@ import java.util.Optional;
import java.util.concurrent.Executor;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_REQUEST;
public class TripleServerStream extends AbstractStream implements ServerStream
{
- private static final Logger LOGGER =
LoggerFactory.getLogger(TripleServerStream.class);
+ private static final ErrorTypeAwareLogger LOGGER =
LoggerFactory.getErrorTypeAwareLogger(TripleServerStream.class);
public final ServerTransportObserver transportObserver = new
ServerTransportObserver();
private final TripleWriteQueue writeQueue;
private final PathResolver pathResolver;
@@ -408,11 +410,10 @@ public class TripleServerStream extends AbstractStream
implements ServerStream {
}
}
- Map<String, Object> requestMetadata = headersToMap(headers, () -> {
- return
Optional.ofNullable(headers.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader()))
- .map(CharSequence::toString)
- .orElse(null);
- });
+ Map<String, Object> requestMetadata = headersToMap(headers, () ->
+
Optional.ofNullable(headers.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader()))
+ .map(CharSequence::toString)
+ .orElse(null));
boolean hasStub = pathResolver.hasNativeStub(path);
if (hasStub) {
listener = new StubAbstractServerCall(invoker,
TripleServerStream.this,
@@ -431,7 +432,15 @@ public class TripleServerStream extends AbstractStream
implements ServerStream {
@Override
public void onData(ByteBuf data, boolean endStream) {
- executor.execute(() -> doOnData(data, endStream));
+ try {
+ executor.execute(() -> doOnData(data, endStream));
+ } catch (Throwable t) {
+ // Tasks will be rejected when the thread pool is closed or
full,
+ // ByteBuf needs to be released to avoid out of heap memory
leakage.
+ // For example, ThreadLessExecutor will be shutdown when
request timeout {@link AsyncRpcResult}
+ ReferenceCountUtil.release(data);
+ LOGGER.error(PROTOCOL_FAILED_REQUEST, "", "", "submit onData
task failed", t);
+ }
}
private void doOnData(ByteBuf data, boolean endStream) {
@@ -454,10 +463,8 @@ public class TripleServerStream extends AbstractStream
implements ServerStream {
if (listener == null) {
return;
}
- executor.execute(() -> {
- listener.onCancelByRemote(TriRpcStatus.CANCELLED
- .withDescription("Canceled by client ,errorCode=" +
errorCode));
- });
+ executor.execute(() ->
listener.onCancelByRemote(TriRpcStatus.CANCELLED
+ .withDescription("Canceled by client ,errorCode=" +
errorCode)));
}
}