This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 37420ec6d6 IGNITE-23374 Java thin: Fix Netty buffer leak (#4514)
37420ec6d6 is described below

commit 37420ec6d64793a7e082b058a963199c169d678f
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Oct 8 10:24:44 2024 +0300

    IGNITE-23374 Java thin: Fix Netty buffer leak (#4514)
    
    * Restore `try (var unpacker = new ClientMessageUnpacker(buf))` in 
`onMessage` (entry point)
    * Add `retain()` where needed
    
    Adding `retain()` in places where the buffer escapes current thread is 
easier than hunting leaks (forgotten `release` calls).
---
 .../ignite/internal/client/TcpClientChannel.java   | 53 +++++++++-------------
 .../ignite/internal/client/table/ClientTable.java  |  2 +-
 .../org/apache/ignite/client/ConnectionTest.java   |  4 +-
 3 files changed, 25 insertions(+), 34 deletions(-)

diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 80012c5436..072a78f756 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -275,9 +275,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
     /** {@inheritDoc} */
     @Override
     public void onMessage(ByteBuf buf) {
-        try {
-            var unpacker = new ClientMessageUnpacker(buf);
-
+        try (var unpacker = new ClientMessageUnpacker(buf)) {
             processNextMessage(unpacker);
         } catch (Throwable t) {
             close(t, false);
@@ -318,9 +316,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
                 notificationHandlers.put(id, notificationFut);
             }
 
-            CompletableFuture<T> fut = send(opCode, id, payloadWriter, 
payloadReader, notificationFut, operationTimeout);
-
-            return fut;
+            return send(opCode, id, payloadWriter, payloadReader, 
notificationFut, operationTimeout);
 
         } catch (Throwable t) {
             return failedFuture(t);
@@ -384,25 +380,24 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
             });
 
             if (PublicApiThreading.executingSyncPublicApi()) {
+                // We are in the public API (user) thread, deserialize the 
response here.
                 try {
-                    ClientMessageUnpacker unpacker = fut.get();
+                    ClientMessageUnpacker unpacker = fut.join();
 
                     return completedFuture(complete(payloadReader, 
notificationFut, unpacker));
-                } catch (Exception e) {
-                    return failedFuture(e);
+                } catch (Throwable t) {
+                    throw sneakyThrow(ViewUtils.ensurePublicException(t));
                 }
             }
 
-            return fut.handleAsync(
-                    (unpacker, err) -> {
-                        if (err != null) {
-                            throw sneakyThrow(err);
-                        }
+            // Handle the response in the async continuation pool.
+            return fut.handleAsync((unpacker, err) -> {
+                if (err != null) {
+                    throw sneakyThrow(ViewUtils.ensurePublicException(err));
+                }
 
-                        return complete(payloadReader, notificationFut, 
unpacker);
-                    },
-                    asyncContinuationExecutor
-            );
+                return complete(payloadReader, notificationFut, unpacker);
+            }, asyncContinuationExecutor);
         } catch (Throwable t) {
             log.warn("Failed to send request [id=" + id + ", op=" + opCode + 
", remoteAddress=" + cfg.getAddress() + "]: "
                     + t.getMessage(), t);
@@ -424,22 +419,22 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
      * @param notificationFut Notify future.
      * @param unpacker Unpacked message.
      */
-    private <T> T complete(
-            PayloadReader<T> payloadReader,
-            CompletableFuture<PayloadInputChannel> notificationFut,
+    private <T> @Nullable T complete(
+            @Nullable PayloadReader<T> payloadReader,
+            @Nullable CompletableFuture<PayloadInputChannel> notificationFut,
             ClientMessageUnpacker unpacker
     ) {
         try (unpacker) {
             if (payloadReader != null) {
                 return payloadReader.apply(new PayloadInputChannel(this, 
unpacker, notificationFut));
             }
+
+            return null;
         } catch (Throwable e) {
             log.error("Failed to deserialize server response [remoteAddress=" 
+ cfg.getAddress() + "]: " + e.getMessage(), e);
 
             throw new IgniteException(PROTOCOL_ERR, "Failed to deserialize 
server response: " + e.getMessage(), e);
         }
-
-        return null;
     }
 
     /**
@@ -448,7 +443,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
     private void processNextMessage(ClientMessageUnpacker unpacker) throws 
IgniteException {
         if (protocolCtx == null) {
             // Process handshake.
-            pendingReqs.remove(-1L).future().complete(unpacker);
+            pendingReqs.remove(-1L).future().complete(unpacker.retain());
             return;
         }
 
@@ -469,8 +464,6 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
         TimeoutObjectImpl pendingReq = pendingReqs.remove(resId);
 
         if (pendingReq == null) {
-            unpacker.close();
-
             log.error("Unexpected response ID [remoteAddress=" + 
cfg.getAddress() + "]: " + resId);
 
             throw new IgniteClientConnectionException(PROTOCOL_ERR, 
String.format("Unexpected response ID [%s]", resId), endpoint());
@@ -481,13 +474,11 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
         if (err == null) {
             metrics.requestsCompletedIncrement();
 
-            pendingReq.future().complete(unpacker);
+            pendingReq.future().complete(unpacker.retain());
         } else {
             metrics.requestsFailedIncrement();
             notificationHandlers.remove(resId);
 
-            unpacker.close();
-
             pendingReq.future().completeExceptionally(err);
         }
     }
@@ -517,7 +508,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
             throw new IgniteClientConnectionException(PROTOCOL_ERR, 
String.format("Unexpected notification ID [%s]", id), endpoint());
         }
 
-        try (unpacker) {
+        try {
             if (err != null) {
                 handler.completeExceptionally(err);
             } else {
@@ -820,7 +811,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
          * @param timeout Timeout in milliseconds.
          * @param fut Target future.
          */
-        public TimeoutObjectImpl(long timeout, 
CompletableFuture<ClientMessageUnpacker> fut) {
+        private TimeoutObjectImpl(long timeout, 
CompletableFuture<ClientMessageUnpacker> fut) {
             this.endTime = timeout > 0 ? coarseCurrentTimeMillis() + timeout : 
0;
             this.fut = fut;
         }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index 8ab926ef08..58b99427fd 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -576,7 +576,7 @@ public class ClientTable implements Table {
         assert in != null;
         assert schemaId != null;
 
-        var resFut = getSchema(schemaId).thenApply(schema -> fn.apply(schema, 
in));
+        CompletableFuture<T> resFut = getSchema(schemaId).thenApply(schema -> 
fn.apply(schema, in));
 
         // Close unpacker.
         resFut.handle((tuple, err) -> {
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
index e129bb9104..d9e5f26d90 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
@@ -79,7 +79,7 @@ public class ConnectionTest extends AbstractClientTest {
 
         // It does not seem possible to verify that it's a 'Connection 
refused' exception because with different
         // user locales the message differs, so let's just check that the 
message ends with the known suffix.
-        assertThat(errMsg, endsWith(" [endpoint=127.0.0.1:47500]"));
+        assertThat(errMsg, endsWith(":47500]"));
     }
 
     @Test
@@ -147,7 +147,7 @@ public class ConnectionTest extends AbstractClientTest {
             assertThrowsWithCause(
                     clientBuilder::build,
                     IgniteClientConnectionException.class,
-                    "Handshake timeout [endpoint=127.0.0.1:" + 
testServer.port() + "]"
+                    "Handshake timeout [endpoint=127.0.0.1"
             );
 
             testServer.enableClientRequestHandling();

Reply via email to