This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 027dcea777 IGNITE-19560 Java client: fix Netty buffer leak (#2229)
027dcea777 is described below

commit 027dcea777f3cd9dbb03ceeb95a384e55fb523ad
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed Jun 21 17:18:18 2023 +0300

    IGNITE-19560 Java client: fix Netty buffer leak (#2229)
    
    * Fix certain leak in `TcpClientChannel`
    * Fix potential leak in `ClientInboundMessageHandler`
    * Improve leak detection further
---
 .../ignite/client/handler/ClientInboundMessageHandler.java       | 9 +++++++--
 .../java/org/apache/ignite/internal/client/TcpClientChannel.java | 8 +++++++-
 .../test/java/org/apache/ignite/client/AbstractClientTest.java   | 4 ++++
 .../src/test/java/org/apache/ignite/client/TestServer.java       | 3 +++
 modules/runner/build.gradle                                      | 1 +
 .../ignite/internal/runner/app/PlatformTestNodeRunner.java       | 2 ++
 6 files changed, 24 insertions(+), 3 deletions(-)

diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 80e0ac3a1b..392a3f707f 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -359,8 +359,13 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
         var buf = packer.getBuffer();
         int bytes = buf.readableBytes();
 
-        // writeAndFlush releases pooled buffer.
-        ctx.writeAndFlush(buf);
+        try {
+            // writeAndFlush releases pooled buffer.
+            ctx.writeAndFlush(buf);
+        } catch (Throwable t) {
+            buf.release();
+            throw t;
+        }
 
         metrics.bytesSentAdd(bytes);
     }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 9d7c6931de..edeaca4976 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -401,7 +401,13 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
         }
 
         if (unpacker.tryUnpackNil()) {
-            pendingReq.complete(unpacker);
+            boolean completed = pendingReq.complete(unpacker);
+
+            if (!completed) {
+                // Already completed (timeout, error, closing channel).
+                unpacker.close();
+            }
+
             metrics.requestsCompletedIncrement();
         } else {
             Throwable err = readError(unpacker);
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
index 51dc45ddc8..3b788855af 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
@@ -75,6 +75,10 @@ public abstract class AbstractClientTest {
     public static void afterAll() throws Exception {
         client.close();
         testServer.close();
+
+        // Force GC to detect Netty buffer leaks.
+        // noinspection CallToSystemGC
+        System.gc();
     }
 
     /**
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java 
b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index c6caa5e881..6f7485cea7 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -22,6 +22,7 @@ import static org.mockito.Answers.RETURNS_DEEP_STUBS;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 
+import io.netty.util.ResourceLeakDetector;
 import java.io.IOError;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -111,6 +112,8 @@ public class TestServer implements AutoCloseable {
             @Nullable AuthenticationConfiguration authenticationConfiguration,
             @Nullable Integer port
     ) {
+        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+
         generator = new 
ConfigurationTreeGenerator(ClientConnectorConfiguration.KEY, 
NetworkConfiguration.KEY);
         cfg = new ConfigurationRegistry(
                 List.of(ClientConnectorConfiguration.KEY, 
NetworkConfiguration.KEY),
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index f8d9eb6d45..d90251e74a 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -157,6 +157,7 @@ dependencies {
     }
     integrationTestImplementation libs.typesafe.config
     integrationTestImplementation libs.auto.service.annotations
+    integrationTestImplementation libs.netty.common
 
     testFixturesImplementation project(':ignite-api')
     testFixturesImplementation project(':ignite-sql-engine')
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
index d238ba42c7..31e19d7cc9 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
@@ -23,6 +23,7 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.escapeWindowsPath;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getResourcePath;
 
+import io.netty.util.ResourceLeakDetector;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -175,6 +176,7 @@ public class PlatformTestNodeRunner {
      */
     public static void main(String[] args) throws Exception {
         System.out.println("Starting test node runner...");
+        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
 
         for (int i = 0; i < args.length; i++) {
             System.out.println("Arg " + i + ": " + args[i]);

Reply via email to