This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 37420ec6d6 IGNITE-23374 Java thin: Fix Netty buffer leak (#4514)
37420ec6d6 is described below
commit 37420ec6d64793a7e082b058a963199c169d678f
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Oct 8 10:24:44 2024 +0300
IGNITE-23374 Java thin: Fix Netty buffer leak (#4514)
* Restore `try (var unpacker = new ClientMessageUnpacker(buf))` in
`onMessage` (entry point)
* Add `retain()` where needed
Adding `retain()` in places where the buffer escapes current thread is
easier than hunting leaks (forgotten `release` calls).
---
.../ignite/internal/client/TcpClientChannel.java | 53 +++++++++-------------
.../ignite/internal/client/table/ClientTable.java | 2 +-
.../org/apache/ignite/client/ConnectionTest.java | 4 +-
3 files changed, 25 insertions(+), 34 deletions(-)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 80012c5436..072a78f756 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -275,9 +275,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
/** {@inheritDoc} */
@Override
public void onMessage(ByteBuf buf) {
- try {
- var unpacker = new ClientMessageUnpacker(buf);
-
+ try (var unpacker = new ClientMessageUnpacker(buf)) {
processNextMessage(unpacker);
} catch (Throwable t) {
close(t, false);
@@ -318,9 +316,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
notificationHandlers.put(id, notificationFut);
}
- CompletableFuture<T> fut = send(opCode, id, payloadWriter,
payloadReader, notificationFut, operationTimeout);
-
- return fut;
+ return send(opCode, id, payloadWriter, payloadReader,
notificationFut, operationTimeout);
} catch (Throwable t) {
return failedFuture(t);
@@ -384,25 +380,24 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
});
if (PublicApiThreading.executingSyncPublicApi()) {
+ // We are in the public API (user) thread, deserialize the
response here.
try {
- ClientMessageUnpacker unpacker = fut.get();
+ ClientMessageUnpacker unpacker = fut.join();
return completedFuture(complete(payloadReader,
notificationFut, unpacker));
- } catch (Exception e) {
- return failedFuture(e);
+ } catch (Throwable t) {
+ throw sneakyThrow(ViewUtils.ensurePublicException(t));
}
}
- return fut.handleAsync(
- (unpacker, err) -> {
- if (err != null) {
- throw sneakyThrow(err);
- }
+ // Handle the response in the async continuation pool.
+ return fut.handleAsync((unpacker, err) -> {
+ if (err != null) {
+ throw sneakyThrow(ViewUtils.ensurePublicException(err));
+ }
- return complete(payloadReader, notificationFut,
unpacker);
- },
- asyncContinuationExecutor
- );
+ return complete(payloadReader, notificationFut, unpacker);
+ }, asyncContinuationExecutor);
} catch (Throwable t) {
log.warn("Failed to send request [id=" + id + ", op=" + opCode +
", remoteAddress=" + cfg.getAddress() + "]: "
+ t.getMessage(), t);
@@ -424,22 +419,22 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
* @param notificationFut Notify future.
* @param unpacker Unpacked message.
*/
- private <T> T complete(
- PayloadReader<T> payloadReader,
- CompletableFuture<PayloadInputChannel> notificationFut,
+ private <T> @Nullable T complete(
+ @Nullable PayloadReader<T> payloadReader,
+ @Nullable CompletableFuture<PayloadInputChannel> notificationFut,
ClientMessageUnpacker unpacker
) {
try (unpacker) {
if (payloadReader != null) {
return payloadReader.apply(new PayloadInputChannel(this,
unpacker, notificationFut));
}
+
+ return null;
} catch (Throwable e) {
log.error("Failed to deserialize server response [remoteAddress="
+ cfg.getAddress() + "]: " + e.getMessage(), e);
throw new IgniteException(PROTOCOL_ERR, "Failed to deserialize
server response: " + e.getMessage(), e);
}
-
- return null;
}
/**
@@ -448,7 +443,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
private void processNextMessage(ClientMessageUnpacker unpacker) throws
IgniteException {
if (protocolCtx == null) {
// Process handshake.
- pendingReqs.remove(-1L).future().complete(unpacker);
+ pendingReqs.remove(-1L).future().complete(unpacker.retain());
return;
}
@@ -469,8 +464,6 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
TimeoutObjectImpl pendingReq = pendingReqs.remove(resId);
if (pendingReq == null) {
- unpacker.close();
-
log.error("Unexpected response ID [remoteAddress=" +
cfg.getAddress() + "]: " + resId);
throw new IgniteClientConnectionException(PROTOCOL_ERR,
String.format("Unexpected response ID [%s]", resId), endpoint());
@@ -481,13 +474,11 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
if (err == null) {
metrics.requestsCompletedIncrement();
- pendingReq.future().complete(unpacker);
+ pendingReq.future().complete(unpacker.retain());
} else {
metrics.requestsFailedIncrement();
notificationHandlers.remove(resId);
- unpacker.close();
-
pendingReq.future().completeExceptionally(err);
}
}
@@ -517,7 +508,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
throw new IgniteClientConnectionException(PROTOCOL_ERR,
String.format("Unexpected notification ID [%s]", id), endpoint());
}
- try (unpacker) {
+ try {
if (err != null) {
handler.completeExceptionally(err);
} else {
@@ -820,7 +811,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
* @param timeout Timeout in milliseconds.
* @param fut Target future.
*/
- public TimeoutObjectImpl(long timeout,
CompletableFuture<ClientMessageUnpacker> fut) {
+ private TimeoutObjectImpl(long timeout,
CompletableFuture<ClientMessageUnpacker> fut) {
this.endTime = timeout > 0 ? coarseCurrentTimeMillis() + timeout :
0;
this.fut = fut;
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index 8ab926ef08..58b99427fd 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -576,7 +576,7 @@ public class ClientTable implements Table {
assert in != null;
assert schemaId != null;
- var resFut = getSchema(schemaId).thenApply(schema -> fn.apply(schema,
in));
+ CompletableFuture<T> resFut = getSchema(schemaId).thenApply(schema ->
fn.apply(schema, in));
// Close unpacker.
resFut.handle((tuple, err) -> {
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
index e129bb9104..d9e5f26d90 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
@@ -79,7 +79,7 @@ public class ConnectionTest extends AbstractClientTest {
// It does not seem possible to verify that it's a 'Connection
refused' exception because with different
// user locales the message differs, so let's just check that the
message ends with the known suffix.
- assertThat(errMsg, endsWith(" [endpoint=127.0.0.1:47500]"));
+ assertThat(errMsg, endsWith(":47500]"));
}
@Test
@@ -147,7 +147,7 @@ public class ConnectionTest extends AbstractClientTest {
assertThrowsWithCause(
clientBuilder::build,
IgniteClientConnectionException.class,
- "Handshake timeout [endpoint=127.0.0.1:" +
testServer.port() + "]"
+ "Handshake timeout [endpoint=127.0.0.1"
);
testServer.enableClientRequestHandling();