This is an automated email from the ASF dual-hosted git repository.

earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 11900330d9 fix NettyCodecAdapter decoder memory leak (#14538)
11900330d9 is described below

commit 11900330d91f528cfcbdc4755bdd23651b516f52
Author: jiangyuan <[email protected]>
AuthorDate: Wed Aug 28 15:30:14 2024 +0800

    fix NettyCodecAdapter decoder memory leak (#14538)
    
    * fix NettyCodecAdapter decoder memory leak
    
    * fix format
    
    * update comment
---
 .../transport/netty4/NettyCodecAdapter.java        | 42 ++++++++++++----------
 .../transport/netty4/NettyCodecAdapterTest.java    | 39 ++++++++++++++++++++
 2 files changed, 62 insertions(+), 19 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java
 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java
index 6001213c78..894d19c73c 100644
--- 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java
+++ 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java
@@ -94,26 +94,30 @@ public final class NettyCodecAdapter {
         protected void decode(ChannelHandlerContext ctx, ByteBuf input, 
List<Object> out) throws Exception {
 
             ChannelBuffer message = new NettyBackedChannelBuffer(input);
-
-            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), 
url, handler);
-
-            // decode object.
-            do {
-                int saveReaderIndex = message.readerIndex();
-                Object msg = codec.decode(channel, message);
-                if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
-                    message.readerIndex(saveReaderIndex);
-                    break;
-                } else {
-                    // is it possible to go here ?
-                    if (saveReaderIndex == message.readerIndex()) {
-                        throw new IOException("Decode without read data.");
-                    }
-                    if (msg != null) {
-                        out.add(msg);
+            try {
+                NettyChannel channel = 
NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
+
+                // decode object.
+                do {
+                    int saveReaderIndex = message.readerIndex();
+                    Object msg = codec.decode(channel, message);
+                    if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
+                        message.readerIndex(saveReaderIndex);
+                        break;
+                    } else {
+                        // is it possible to go here ?
+                        if (saveReaderIndex == message.readerIndex()) {
+                            throw new IOException("Decode without read data.");
+                        }
+                        if (msg != null) {
+                            out.add(msg);
+                        }
                     }
-                }
-            } while (message.readable());
+                } while (message.readable());
+            } catch (Throwable t) {
+                message.skipBytes(message.readableBytes());
+                throw t;
+            }
         }
     }
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapterTest.java
 
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapterTest.java
index 0a28ed3b13..7ffb0cc5a5 100644
--- 
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapterTest.java
+++ 
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapterTest.java
@@ -19,13 +19,26 @@ package org.apache.dubbo.remoting.transport.netty4;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.remoting.ChannelHandler;
 import org.apache.dubbo.remoting.Codec2;
+import org.apache.dubbo.remoting.Constants;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import io.netty.buffer.AbstractByteBufAllocator;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.DecoderException;
 import io.netty.handler.codec.MessageToByteEncoder;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+
 /**
  * {@link NettyCodecAdapter}
  */
@@ -42,4 +55,30 @@ class NettyCodecAdapterTest {
         Assertions.assertTrue(decoder instanceof ByteToMessageDecoder);
         Assertions.assertTrue(encoder instanceof MessageToByteEncoder);
     }
+
+    @Test
+    void testDecodeException() throws IOException {
+        Codec2 codec2 = Mockito.mock(Codec2.class);
+        doThrow(new 
IOException("testDecodeIllegalPacket")).when(codec2).decode(any(), any());
+
+        URL url = Mockito.mock(URL.class);
+        doReturn("default").when(url).getParameter(eq(Constants.CODEC_KEY));
+
+        ChannelHandler handler = Mockito.mock(ChannelHandler.class);
+        NettyCodecAdapter nettyCodecAdapter = new NettyCodecAdapter(codec2, 
url, handler);
+        io.netty.channel.ChannelHandler decoder = 
nettyCodecAdapter.getDecoder();
+        EmbeddedChannel embeddedChannel = new EmbeddedChannel();
+        embeddedChannel.pipeline().addLast(decoder);
+
+        // simulate illegal data packet
+        ByteBuf input = AbstractByteBufAllocator.DEFAULT.buffer();
+        
input.writeBytes("testDecodeIllegalPacket".getBytes(StandardCharsets.UTF_8));
+
+        DecoderException decoderException = 
Assertions.assertThrows(DecoderException.class, () -> {
+            embeddedChannel.writeInbound(input);
+        });
+        Assertions.assertTrue(decoderException.getCause() instanceof 
IOException);
+
+        Assertions.assertEquals(0, input.refCnt());
+    }
 }

Reply via email to