This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 11900330d9 fix NettyCodecAdapter decoder memory leak (#14538)
11900330d9 is described below
commit 11900330d91f528cfcbdc4755bdd23651b516f52
Author: jiangyuan <[email protected]>
AuthorDate: Wed Aug 28 15:30:14 2024 +0800
fix NettyCodecAdapter decoder memory leak (#14538)
* fix NettyCodecAdapter decoder memory leak
* fix format
* update comment
---
.../transport/netty4/NettyCodecAdapter.java | 42 ++++++++++++----------
.../transport/netty4/NettyCodecAdapterTest.java | 39 ++++++++++++++++++++
2 files changed, 62 insertions(+), 19 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java
index 6001213c78..894d19c73c 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java
@@ -94,26 +94,30 @@ public final class NettyCodecAdapter {
protected void decode(ChannelHandlerContext ctx, ByteBuf input,
List<Object> out) throws Exception {
ChannelBuffer message = new NettyBackedChannelBuffer(input);
-
- NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(),
url, handler);
-
- // decode object.
- do {
- int saveReaderIndex = message.readerIndex();
- Object msg = codec.decode(channel, message);
- if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
- message.readerIndex(saveReaderIndex);
- break;
- } else {
- // is it possible to go here ?
- if (saveReaderIndex == message.readerIndex()) {
- throw new IOException("Decode without read data.");
- }
- if (msg != null) {
- out.add(msg);
+ try {
+ NettyChannel channel =
NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
+
+ // decode object.
+ do {
+ int saveReaderIndex = message.readerIndex();
+ Object msg = codec.decode(channel, message);
+ if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
+ message.readerIndex(saveReaderIndex);
+ break;
+ } else {
+ // is it possible to go here ?
+ if (saveReaderIndex == message.readerIndex()) {
+ throw new IOException("Decode without read data.");
+ }
+ if (msg != null) {
+ out.add(msg);
+ }
}
- }
- } while (message.readable());
+ } while (message.readable());
+ } catch (Throwable t) {
+ message.skipBytes(message.readableBytes());
+ throw t;
+ }
}
}
}
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapterTest.java
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapterTest.java
index 0a28ed3b13..7ffb0cc5a5 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapterTest.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapterTest.java
@@ -19,13 +19,26 @@ package org.apache.dubbo.remoting.transport.netty4;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Codec2;
+import org.apache.dubbo.remoting.Constants;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import io.netty.buffer.AbstractByteBufAllocator;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.MessageToByteEncoder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+
/**
* {@link NettyCodecAdapter}
*/
@@ -42,4 +55,30 @@ class NettyCodecAdapterTest {
Assertions.assertTrue(decoder instanceof ByteToMessageDecoder);
Assertions.assertTrue(encoder instanceof MessageToByteEncoder);
}
+
+ @Test
+ void testDecodeException() throws IOException {
+ Codec2 codec2 = Mockito.mock(Codec2.class);
+ doThrow(new
IOException("testDecodeIllegalPacket")).when(codec2).decode(any(), any());
+
+ URL url = Mockito.mock(URL.class);
+ doReturn("default").when(url).getParameter(eq(Constants.CODEC_KEY));
+
+ ChannelHandler handler = Mockito.mock(ChannelHandler.class);
+ NettyCodecAdapter nettyCodecAdapter = new NettyCodecAdapter(codec2,
url, handler);
+ io.netty.channel.ChannelHandler decoder =
nettyCodecAdapter.getDecoder();
+ EmbeddedChannel embeddedChannel = new EmbeddedChannel();
+ embeddedChannel.pipeline().addLast(decoder);
+
+ // simulate illegal data packet
+ ByteBuf input = AbstractByteBufAllocator.DEFAULT.buffer();
+
input.writeBytes("testDecodeIllegalPacket".getBytes(StandardCharsets.UTF_8));
+
+ DecoderException decoderException =
Assertions.assertThrows(DecoderException.class, () -> {
+ embeddedChannel.writeInbound(input);
+ });
+ Assertions.assertTrue(decoderException.getCause() instanceof
IOException);
+
+ Assertions.assertEquals(0, input.refCnt());
+ }
}