This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 027dcea777 IGNITE-19560 Java client: fix Netty buffer leak (#2229)
027dcea777 is described below
commit 027dcea777f3cd9dbb03ceeb95a384e55fb523ad
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed Jun 21 17:18:18 2023 +0300
IGNITE-19560 Java client: fix Netty buffer leak (#2229)
* Fix certain leak in `TcpClientChannel`
* Fix potential leak in `ClientInboundMessageHandler`
* Improve leak detection further
---
.../ignite/client/handler/ClientInboundMessageHandler.java | 9 +++++++--
.../java/org/apache/ignite/internal/client/TcpClientChannel.java | 8 +++++++-
.../test/java/org/apache/ignite/client/AbstractClientTest.java | 4 ++++
.../src/test/java/org/apache/ignite/client/TestServer.java | 3 +++
modules/runner/build.gradle | 1 +
.../ignite/internal/runner/app/PlatformTestNodeRunner.java | 2 ++
6 files changed, 24 insertions(+), 3 deletions(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 80e0ac3a1b..392a3f707f 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -359,8 +359,13 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
var buf = packer.getBuffer();
int bytes = buf.readableBytes();
- // writeAndFlush releases pooled buffer.
- ctx.writeAndFlush(buf);
+ try {
+ // writeAndFlush releases pooled buffer.
+ ctx.writeAndFlush(buf);
+ } catch (Throwable t) {
+ buf.release();
+ throw t;
+ }
metrics.bytesSentAdd(bytes);
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 9d7c6931de..edeaca4976 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -401,7 +401,13 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
}
if (unpacker.tryUnpackNil()) {
- pendingReq.complete(unpacker);
+ boolean completed = pendingReq.complete(unpacker);
+
+ if (!completed) {
+ // Already completed (timeout, error, closing channel).
+ unpacker.close();
+ }
+
metrics.requestsCompletedIncrement();
} else {
Throwable err = readError(unpacker);
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
index 51dc45ddc8..3b788855af 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
@@ -75,6 +75,10 @@ public abstract class AbstractClientTest {
public static void afterAll() throws Exception {
client.close();
testServer.close();
+
+ // Force GC to detect Netty buffer leaks.
+ // noinspection CallToSystemGC
+ System.gc();
}
/**
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index c6caa5e881..6f7485cea7 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -22,6 +22,7 @@ import static org.mockito.Answers.RETURNS_DEEP_STUBS;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
+import io.netty.util.ResourceLeakDetector;
import java.io.IOError;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -111,6 +112,8 @@ public class TestServer implements AutoCloseable {
@Nullable AuthenticationConfiguration authenticationConfiguration,
@Nullable Integer port
) {
+ ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+
generator = new
ConfigurationTreeGenerator(ClientConnectorConfiguration.KEY,
NetworkConfiguration.KEY);
cfg = new ConfigurationRegistry(
List.of(ClientConnectorConfiguration.KEY,
NetworkConfiguration.KEY),
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index f8d9eb6d45..d90251e74a 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -157,6 +157,7 @@ dependencies {
}
integrationTestImplementation libs.typesafe.config
integrationTestImplementation libs.auto.service.annotations
+ integrationTestImplementation libs.netty.common
testFixturesImplementation project(':ignite-api')
testFixturesImplementation project(':ignite-sql-engine')
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
index d238ba42c7..31e19d7cc9 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
@@ -23,6 +23,7 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.escapeWindowsPath;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.getResourcePath;
+import io.netty.util.ResourceLeakDetector;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -175,6 +176,7 @@ public class PlatformTestNodeRunner {
*/
public static void main(String[] args) throws Exception {
System.out.println("Starting test node runner...");
+ ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
for (int i = 0; i < args.length; i++) {
System.out.println("Arg " + i + ": " + args[i]);