This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch jdbc_over_thin_sql
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/jdbc_over_thin_sql by this
push:
new 5e53a78b5e5 IGNITE-26789 Jdbc. Fix resource leak when client
disconnects (#6879)
5e53a78b5e5 is described below
commit 5e53a78b5e5177540b1e4dbf93cf282eff2b7608
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Tue Nov 4 16:05:16 2025 +0300
IGNITE-26789 Jdbc. Fix resource leak when client disconnects (#6879)
---
.../handler/requests/sql/ClientSqlCommon.java | 32 +++++++++++++++++----
.../requests/sql/ClientSqlCursorCloseRequest.java | 14 ++++++++-
.../sql/ClientSqlCursorNextPageRequest.java | 6 ++--
.../sql/ClientSqlCursorNextResultRequest.java | 3 +-
.../requests/sql/ClientSqlExecuteRequest.java | 2 +-
.../jdbc2/ItJdbcClusterPerIntegrationTest.java | 13 ---------
.../ignite/jdbc/ItJdbcMultiStatementSelfTest.java | 33 ++++++++++++++--------
.../ignite/internal/jdbc2/ResultSetWrapper.java | 16 +++++------
8 files changed, 76 insertions(+), 43 deletions(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
index ff3d3e7f6a7..c5ae6f11eeb 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
@@ -24,6 +24,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Function;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -38,7 +40,6 @@ import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
import
org.apache.ignite.internal.sql.engine.prepare.partitionawareness.PartitionAwarenessMetadata;
-import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.sql.ColumnMetadata;
import org.apache.ignite.sql.ColumnMetadata.ColumnOrigin;
import org.apache.ignite.sql.ResultSetMetadata;
@@ -254,11 +255,12 @@ class ClientSqlCommon {
int pageSize,
boolean includePartitionAwarenessMeta,
boolean sqlDirectTxMappingSupported,
- boolean sqlMultiStatementSupported
+ boolean sqlMultiStatementSupported,
+ Executor executor
) {
try {
Long nextResultResourceId = sqlMultiStatementSupported &&
asyncResultSet.cursor().hasNextResult()
- ?
saveNextResultResource(asyncResultSet.cursor().nextResult(), pageSize,
resources)
+ ?
saveNextResultResource(asyncResultSet.cursor().nextResult(), pageSize,
resources, executor)
: null;
if ((asyncResultSet.hasRowSet() && asyncResultSet.hasMorePages()))
{
@@ -295,15 +297,35 @@ class ClientSqlCommon {
private static Long saveNextResultResource(
CompletableFuture<AsyncSqlCursor<InternalSqlRow>> nextResultFuture,
int pageSize,
- ClientResourceRegistry resources
+ ClientResourceRegistry resources,
+ Executor executor
) throws IgniteInternalCheckedException {
ClientResource resource = new ClientResource(
new CursorWithPageSize(nextResultFuture, pageSize),
- () -> nextResultFuture.thenApply(AsyncCursor::closeAsync));
+ () -> nextResultFuture.thenAccept(cur ->
iterateThroughResultsAndCloseThem(cur, executor))
+ );
return resources.put(resource);
}
+ private static void
iterateThroughResultsAndCloseThem(AsyncSqlCursor<InternalSqlRow> cursor,
Executor executor) {
+ Function<AsyncSqlCursor<InternalSqlRow>,
CompletableFuture<AsyncSqlCursor<InternalSqlRow>>> traverser = new Function<>()
{
+ @Override
+ public CompletableFuture<AsyncSqlCursor<InternalSqlRow>>
apply(AsyncSqlCursor<InternalSqlRow> cur) {
+ return cur.closeAsync()
+ .thenComposeAsync(none -> {
+ if (cur.hasNextResult()) {
+ return cur.nextResult().thenComposeAsync(this,
executor);
+ } else {
+ return CompletableFuture.completedFuture(cur);
+ }
+ }, executor);
+ }
+ };
+
+ CompletableFuture.completedFuture(cursor).thenCompose(traverser);
+ }
+
private static void writeResultSet(
ClientMessagePacker out,
AsyncResultSetImpl res,
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java
index d2eea6926f9..5e72923d1a7 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java
@@ -18,10 +18,13 @@
package org.apache.ignite.client.handler.requests.sql;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.handler.ClientResource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.client.handler.ResponseWriter;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.util.CompletableFutures;
/**
* Client SQL cursor close request.
@@ -39,8 +42,17 @@ public class ClientSqlCursorCloseRequest {
ClientResourceRegistry resources
) throws IgniteInternalCheckedException {
long resourceId = in.unpackLong();
+ ClientResource resource;
- ClientSqlResultSet asyncResultSet =
resources.remove(resourceId).get(ClientSqlResultSet.class);
+ try {
+ resource = resources.remove(resourceId);
+ } catch (IgniteInternalCheckedException | IgniteInternalException
ignored) {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-26927 Do not
completely ignore "resource not found" exception
+ // Ignore: either resource was already removed during asynchronous
data fetch request, or registry is closing.
+ return CompletableFutures.nullCompletedFuture();
+ }
+
+ ClientSqlResultSet asyncResultSet =
resource.get(ClientSqlResultSet.class);
return asyncResultSet.closeAsync().thenApply(v -> null);
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextPageRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextPageRequest.java
index 86a2cbf21c1..d4b234c5010 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextPageRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextPageRequest.java
@@ -25,6 +25,7 @@ import
org.apache.ignite.client.handler.ClientResourceRegistry;
import org.apache.ignite.client.handler.ResponseWriter;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
+import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.async.AsyncResultSet;
@@ -51,8 +52,9 @@ public class ClientSqlCursorNextPageRequest {
if (!r.hasMorePages()) {
try {
resources.remove(resourceId);
- } catch (IgniteInternalCheckedException ignored) {
- // Ignore: either resource already removed, or
registry is closing.
+ } catch (IgniteInternalCheckedException |
IgniteInternalException ignored) {
+ // TODO
https://issues.apache.org/jira/browse/IGNITE-26927 Handle "resource not found"
exception properly
+ // Ignore: either resource already removed by
concurrent close operation, or registry is closing.
}
return resultSet.closeAsync().thenApply(v ->
getResponseWriter(r));
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java
index 613ea270992..0c6025aad9e 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java
@@ -59,7 +59,8 @@ public class ClientSqlCursorNextResultRequest {
pageSize
)
).thenCompose(asyncResultSet ->
- ClientSqlCommon.writeResultSetAsync(resources,
asyncResultSet, metrics, pageSize, false, false, true)
+ ClientSqlCommon.writeResultSetAsync(resources,
asyncResultSet, metrics, pageSize, false, false, true,
+ operationExecutor)
).thenApply(rsWriter -> rsWriter), operationExecutor);
f.whenCompleteAsync((r, t) -> {
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index 758bc01a494..418c9238d5e 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -127,7 +127,7 @@ public class ClientSqlExecuteRequest {
arguments
).thenCompose(asyncResultSet ->
ClientSqlCommon.writeResultSetAsync(resources,
asyncResultSet, metrics, props.pageSize(),
- includePartitionAwarenessMeta,
sqlDirectTxMappingSupported, sqlMultistatementsSupported))
+ includePartitionAwarenessMeta,
sqlDirectTxMappingSupported, sqlMultistatementsSupported, operationExecutor))
.thenApply(rsWriter -> out -> {
if (tx != null) {
writeTxMeta(out, timestampTracker, clockService, tx,
resIdHolder[0]);
diff --git
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/internal/jdbc2/ItJdbcClusterPerIntegrationTest.java
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/internal/jdbc2/ItJdbcClusterPerIntegrationTest.java
index 06d37f12872..87ca5ea7671 100644
---
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/internal/jdbc2/ItJdbcClusterPerIntegrationTest.java
+++
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/internal/jdbc2/ItJdbcClusterPerIntegrationTest.java
@@ -26,7 +26,6 @@ import java.sql.Statement;
import java.util.List;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.jdbc.ItJdbcStatementSelfTest;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -48,7 +47,6 @@ public class ItJdbcClusterPerIntegrationTest extends
ClusterPerTestIntegrationTe
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-26789")
public void noScriptResourcesAfterExecutingFailingScript() throws
Exception {
for (String statement : STATEMENTS) {
log.info("Run statement: {}", statement);
@@ -58,23 +56,13 @@ public class ItJdbcClusterPerIntegrationTest extends
ClusterPerTestIntegrationTe
stmt.execute(statement);
// Connection should close the statement
}
- // Fails with IGN-CMN-65535 Failed to find resource with id: XYZ
expectNoResources();
- }
- }
-
- @Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-26789")
- public void noScriptResourcesAfterExecutingFailingScript2() throws
Exception {
- for (String statement : STATEMENTS) {
- log.info("Run statement: {}", statement);
try (Connection conn = DriverManager.getConnection(JDBC_URL)) {
try (Statement stmt = conn.createStatement()) {
stmt.execute(statement);
}
- // Fails with IGN-CMN-65535 Failed to find resource with id:
XYZ
// All resources should be released
expectNoResources();
@@ -83,7 +71,6 @@ public class ItJdbcClusterPerIntegrationTest extends
ClusterPerTestIntegrationTe
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-26789")
public void noResourcesScriptAfterClientTerminates() throws Exception {
for (String statement : STATEMENTS) {
log.info("Run statement: {}", statement);
diff --git
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultiStatementSelfTest.java
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultiStatementSelfTest.java
index 5a005bad391..0ea0c772ee1 100644
---
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultiStatementSelfTest.java
+++
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultiStatementSelfTest.java
@@ -18,10 +18,10 @@
package org.apache.ignite.jdbc;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
-import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.jdbc.util.JdbcTestUtils.assertThrowsSqlException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -35,7 +35,10 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.jdbc2.JdbcStatement2;
+import org.apache.ignite.internal.sql.SqlCommon;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
@@ -68,17 +71,10 @@ public class ItJdbcMultiStatementSelfTest extends
AbstractJdbcSelfTest {
@AfterEach
void tearDown() throws Exception {
- int openCursorResources = openResources(CLUSTER);
- // connection + not closed result set
- assertTrue(openResources(CLUSTER) <= 2, "Open cursors: " +
openCursorResources);
-
stmt.close();
- openCursorResources = openResources(CLUSTER);
-
- // only connection context or 0 if already closed.
- assertTrue(openResources(CLUSTER) <= 1, "Open cursors: " +
openCursorResources);
- assertTrue(waitForCondition(() -> openCursors(CLUSTER) == 0, 5_000));
+ Awaitility.await().timeout(5, TimeUnit.SECONDS).until(() ->
openResources(CLUSTER), is(0));
+ Awaitility.await().timeout(5, TimeUnit.SECONDS).until(() ->
openCursors(CLUSTER), is(0));
}
@Test
@@ -245,7 +241,7 @@ public class ItJdbcMultiStatementSelfTest extends
AbstractJdbcSelfTest {
@Test
public void requestMoreThanOneFetch() throws Exception {
- int range = stmt.getFetchSize() + 100;
+ int range = SqlCommon.DEFAULT_PAGE_SIZE + 100;
stmt.execute(format("START TRANSACTION; SELECT * FROM
TABLE(system_range(0, {})); COMMIT;", range));
assertEquals(range + 1, getResultSetSize());
@@ -268,6 +264,21 @@ public class ItJdbcMultiStatementSelfTest extends
AbstractJdbcSelfTest {
assertEquals(2, stmt.getResultSet().getInt(1));
}
+ @Test
+ public void statementMustCloseAllDependentCursors() throws SQLException {
+ int rowsCount = SqlCommon.DEFAULT_PAGE_SIZE * 3;
+
+ stmt.execute(format("SELECT * FROM SYSTEM_RANGE(0, {});"
+ + "SELECT * FROM SYSTEM_RANGE(0, {}); "
+ + "SELECT * FROM SYSTEM_RANGE(0, {});",
+ rowsCount, rowsCount, rowsCount));
+
+ stmt.close();
+
+ Awaitility.await().timeout(5, TimeUnit.SECONDS).until(() ->
openResources(CLUSTER), is(0));
+ Awaitility.await().timeout(5, TimeUnit.SECONDS).until(() ->
openCursors(CLUSTER), is(0));
+ }
+
@Test
public void testMixedDmlQueryExecute() throws Exception {
boolean res = stmt.execute("INSERT INTO TEST_TX VALUES (6, 5, '5');
DELETE FROM TEST_TX WHERE ID=6; SELECT 1;");
diff --git
a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc2/ResultSetWrapper.java
b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc2/ResultSetWrapper.java
index fc600917da9..e39c1fb5a33 100644
---
a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc2/ResultSetWrapper.java
+++
b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc2/ResultSetWrapper.java
@@ -74,26 +74,24 @@ final class ResultSetWrapper {
}
void close() throws SQLException {
- JdbcResultSet current = resultSet;
- if (current == null || current.closed) {
+ JdbcResultSet rs = resultSet;
+ if (rs == null || rs.closed) {
return;
}
- resultSet = null;
- JdbcResultSet rs = current;
+ resultSet = null;
do {
- // TODO: https://issues.apache.org/jira/browse/IGNITE-26789 Close
properly.
- // w/o iteration over all cursors, the cursors that are not
retrieved hung in the void
- // and are never released.
+ // w/o iteration over all cursors, the cursors that are
+ // not retrieved hung in the void and are never released.
try {
+ rs.close();
+
rs = rs.tryNextResultSet();
} catch (SQLException ignore) {
// This is an execution related exception, ignore it.
break;
}
} while (rs != null);
-
- current.close();
}
}