This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch jdbc_over_thin_sql
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/jdbc_over_thin_sql by this 
push:
     new 5e53a78b5e5 IGNITE-26789 Jdbc. Fix resource leak when client 
disconnects (#6879)
5e53a78b5e5 is described below

commit 5e53a78b5e5177540b1e4dbf93cf282eff2b7608
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Tue Nov 4 16:05:16 2025 +0300

    IGNITE-26789 Jdbc. Fix resource leak when client disconnects (#6879)
---
 .../handler/requests/sql/ClientSqlCommon.java      | 32 +++++++++++++++++----
 .../requests/sql/ClientSqlCursorCloseRequest.java  | 14 ++++++++-
 .../sql/ClientSqlCursorNextPageRequest.java        |  6 ++--
 .../sql/ClientSqlCursorNextResultRequest.java      |  3 +-
 .../requests/sql/ClientSqlExecuteRequest.java      |  2 +-
 .../jdbc2/ItJdbcClusterPerIntegrationTest.java     | 13 ---------
 .../ignite/jdbc/ItJdbcMultiStatementSelfTest.java  | 33 ++++++++++++++--------
 .../ignite/internal/jdbc2/ResultSetWrapper.java    | 16 +++++------
 8 files changed, 76 insertions(+), 43 deletions(-)

diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
index ff3d3e7f6a7..c5ae6f11eeb 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
@@ -24,6 +24,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Function;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
 import org.apache.ignite.client.handler.ClientResource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -38,7 +40,6 @@ import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
 import org.apache.ignite.internal.sql.engine.InternalSqlRow;
 import org.apache.ignite.internal.sql.engine.SqlQueryType;
 import 
org.apache.ignite.internal.sql.engine.prepare.partitionawareness.PartitionAwarenessMetadata;
-import org.apache.ignite.internal.util.AsyncCursor;
 import org.apache.ignite.sql.ColumnMetadata;
 import org.apache.ignite.sql.ColumnMetadata.ColumnOrigin;
 import org.apache.ignite.sql.ResultSetMetadata;
@@ -254,11 +255,12 @@ class ClientSqlCommon {
             int pageSize,
             boolean includePartitionAwarenessMeta,
             boolean sqlDirectTxMappingSupported,
-            boolean sqlMultiStatementSupported
+            boolean sqlMultiStatementSupported,
+            Executor executor
     ) {
         try {
             Long nextResultResourceId = sqlMultiStatementSupported && 
asyncResultSet.cursor().hasNextResult()
-                    ? 
saveNextResultResource(asyncResultSet.cursor().nextResult(), pageSize, 
resources)
+                    ? 
saveNextResultResource(asyncResultSet.cursor().nextResult(), pageSize, 
resources, executor)
                     : null;
 
             if ((asyncResultSet.hasRowSet() && asyncResultSet.hasMorePages())) 
{
@@ -295,15 +297,35 @@ class ClientSqlCommon {
     private static Long saveNextResultResource(
             CompletableFuture<AsyncSqlCursor<InternalSqlRow>> nextResultFuture,
             int pageSize,
-            ClientResourceRegistry resources
+            ClientResourceRegistry resources,
+            Executor executor
     ) throws IgniteInternalCheckedException {
         ClientResource resource = new ClientResource(
                 new CursorWithPageSize(nextResultFuture, pageSize),
-                () -> nextResultFuture.thenApply(AsyncCursor::closeAsync));
+                () -> nextResultFuture.thenAccept(cur -> 
iterateThroughResultsAndCloseThem(cur, executor))
+        );
 
         return resources.put(resource);
     }
 
+    private static void 
iterateThroughResultsAndCloseThem(AsyncSqlCursor<InternalSqlRow> cursor, 
Executor executor) {
+        Function<AsyncSqlCursor<InternalSqlRow>, 
CompletableFuture<AsyncSqlCursor<InternalSqlRow>>> traverser = new Function<>() 
{
+            @Override
+            public CompletableFuture<AsyncSqlCursor<InternalSqlRow>> 
apply(AsyncSqlCursor<InternalSqlRow> cur) {
+                return cur.closeAsync()
+                        .thenComposeAsync(none -> {
+                            if (cur.hasNextResult()) {
+                                return cur.nextResult().thenComposeAsync(this, 
executor);
+                            } else {
+                                return CompletableFuture.completedFuture(cur);
+                            }
+                        }, executor);
+            }
+        };
+
+        CompletableFuture.completedFuture(cursor).thenCompose(traverser);
+    }
+
     private static void writeResultSet(
             ClientMessagePacker out,
             AsyncResultSetImpl res,
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java
index d2eea6926f9..5e72923d1a7 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorCloseRequest.java
@@ -18,10 +18,13 @@
 package org.apache.ignite.client.handler.requests.sql;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.handler.ClientResource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
 import org.apache.ignite.client.handler.ResponseWriter;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.util.CompletableFutures;
 
 /**
  * Client SQL cursor close request.
@@ -39,8 +42,17 @@ public class ClientSqlCursorCloseRequest {
             ClientResourceRegistry resources
     ) throws IgniteInternalCheckedException {
         long resourceId = in.unpackLong();
+        ClientResource resource;
 
-        ClientSqlResultSet asyncResultSet = 
resources.remove(resourceId).get(ClientSqlResultSet.class);
+        try {
+            resource = resources.remove(resourceId);
+        } catch (IgniteInternalCheckedException | IgniteInternalException 
ignored) {
+            // TODO https://issues.apache.org/jira/browse/IGNITE-26927 Do not 
completely ignore "resource not found" exception
+            // Ignore: either resource was already removed during asynchronous 
data fetch request, or registry is closing.
+            return CompletableFutures.nullCompletedFuture();
+        }
+
+        ClientSqlResultSet asyncResultSet = 
resource.get(ClientSqlResultSet.class);
 
         return asyncResultSet.closeAsync().thenApply(v -> null);
     }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextPageRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextPageRequest.java
index 86a2cbf21c1..d4b234c5010 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextPageRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextPageRequest.java
@@ -25,6 +25,7 @@ import 
org.apache.ignite.client.handler.ClientResourceRegistry;
 import org.apache.ignite.client.handler.ResponseWriter;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
+import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.sql.async.AsyncResultSet;
 
@@ -51,8 +52,9 @@ public class ClientSqlCursorNextPageRequest {
                     if (!r.hasMorePages()) {
                         try {
                             resources.remove(resourceId);
-                        } catch (IgniteInternalCheckedException ignored) {
-                            // Ignore: either resource already removed, or 
registry is closing.
+                        } catch (IgniteInternalCheckedException | 
IgniteInternalException ignored) {
+                            // TODO 
https://issues.apache.org/jira/browse/IGNITE-26927 Handle "resource not found" 
exception properly
+                            // Ignore: either resource already removed by 
concurrent close operation, or registry is closing.
                         }
 
                         return resultSet.closeAsync().thenApply(v -> 
getResponseWriter(r));
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java
index 613ea270992..0c6025aad9e 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java
@@ -59,7 +59,8 @@ public class ClientSqlCursorNextResultRequest {
                                         pageSize
                                 )
                         ).thenCompose(asyncResultSet ->
-                                ClientSqlCommon.writeResultSetAsync(resources, 
asyncResultSet, metrics, pageSize, false, false, true)
+                                ClientSqlCommon.writeResultSetAsync(resources, 
asyncResultSet, metrics, pageSize, false, false, true,
+                                        operationExecutor)
                         ).thenApply(rsWriter -> rsWriter), operationExecutor);
 
         f.whenCompleteAsync((r, t) -> {
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index 758bc01a494..418c9238d5e 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -127,7 +127,7 @@ public class ClientSqlExecuteRequest {
                 arguments
         ).thenCompose(asyncResultSet ->
                         ClientSqlCommon.writeResultSetAsync(resources, 
asyncResultSet, metrics, props.pageSize(),
-                                includePartitionAwarenessMeta, 
sqlDirectTxMappingSupported, sqlMultistatementsSupported))
+                                includePartitionAwarenessMeta, 
sqlDirectTxMappingSupported, sqlMultistatementsSupported, operationExecutor))
                 .thenApply(rsWriter -> out -> {
                     if (tx != null) {
                         writeTxMeta(out, timestampTracker, clockService, tx, 
resIdHolder[0]);
diff --git 
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/internal/jdbc2/ItJdbcClusterPerIntegrationTest.java
 
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/internal/jdbc2/ItJdbcClusterPerIntegrationTest.java
index 06d37f12872..87ca5ea7671 100644
--- 
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/internal/jdbc2/ItJdbcClusterPerIntegrationTest.java
+++ 
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/internal/jdbc2/ItJdbcClusterPerIntegrationTest.java
@@ -26,7 +26,6 @@ import java.sql.Statement;
 import java.util.List;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.jdbc.ItJdbcStatementSelfTest;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -48,7 +47,6 @@ public class ItJdbcClusterPerIntegrationTest extends 
ClusterPerTestIntegrationTe
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26789";)
     public void noScriptResourcesAfterExecutingFailingScript() throws 
Exception {
         for (String statement : STATEMENTS) {
             log.info("Run statement: {}", statement);
@@ -58,23 +56,13 @@ public class ItJdbcClusterPerIntegrationTest extends 
ClusterPerTestIntegrationTe
                 stmt.execute(statement);
                 // Connection should close the statement
             }
-            // Fails with IGN-CMN-65535 Failed to find resource with id: XYZ
 
             expectNoResources();
-        }
-    }
-
-    @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26789";)
-    public void noScriptResourcesAfterExecutingFailingScript2() throws 
Exception {
-        for (String statement : STATEMENTS) {
-            log.info("Run statement: {}", statement);
 
             try (Connection conn = DriverManager.getConnection(JDBC_URL)) {
                 try (Statement stmt = conn.createStatement()) {
                     stmt.execute(statement);
                 }
-                // Fails with IGN-CMN-65535 Failed to find resource with id: 
XYZ
 
                 // All resources should be released
                 expectNoResources();
@@ -83,7 +71,6 @@ public class ItJdbcClusterPerIntegrationTest extends 
ClusterPerTestIntegrationTe
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26789";)
     public void noResourcesScriptAfterClientTerminates() throws Exception {
         for (String statement : STATEMENTS) {
             log.info("Run statement: {}", statement);
diff --git 
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultiStatementSelfTest.java
 
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultiStatementSelfTest.java
index 5a005bad391..0ea0c772ee1 100644
--- 
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultiStatementSelfTest.java
+++ 
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcMultiStatementSelfTest.java
@@ -18,10 +18,10 @@
 package org.apache.ignite.jdbc;
 
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
-import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.jdbc.util.JdbcTestUtils.assertThrowsSqlException;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -35,7 +35,10 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.jdbc2.JdbcStatement2;
+import org.apache.ignite.internal.sql.SqlCommon;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
@@ -68,17 +71,10 @@ public class ItJdbcMultiStatementSelfTest extends 
AbstractJdbcSelfTest {
 
     @AfterEach
     void tearDown() throws Exception {
-        int openCursorResources = openResources(CLUSTER);
-        // connection + not closed result set
-        assertTrue(openResources(CLUSTER) <= 2, "Open cursors: " + 
openCursorResources);
-
         stmt.close();
 
-        openCursorResources = openResources(CLUSTER);
-
-        // only connection context or 0 if already closed.
-        assertTrue(openResources(CLUSTER) <= 1, "Open cursors: " + 
openCursorResources);
-        assertTrue(waitForCondition(() -> openCursors(CLUSTER) == 0, 5_000));
+        Awaitility.await().timeout(5, TimeUnit.SECONDS).until(() -> 
openResources(CLUSTER), is(0));
+        Awaitility.await().timeout(5, TimeUnit.SECONDS).until(() -> 
openCursors(CLUSTER), is(0));
     }
 
     @Test
@@ -245,7 +241,7 @@ public class ItJdbcMultiStatementSelfTest extends 
AbstractJdbcSelfTest {
 
     @Test
     public void requestMoreThanOneFetch() throws Exception {
-        int range = stmt.getFetchSize() + 100;
+        int range = SqlCommon.DEFAULT_PAGE_SIZE + 100;
         stmt.execute(format("START TRANSACTION; SELECT * FROM 
TABLE(system_range(0, {})); COMMIT;", range));
         assertEquals(range + 1, getResultSetSize());
 
@@ -268,6 +264,21 @@ public class ItJdbcMultiStatementSelfTest extends 
AbstractJdbcSelfTest {
         assertEquals(2, stmt.getResultSet().getInt(1));
     }
 
+    @Test
+    public void statementMustCloseAllDependentCursors() throws SQLException {
+        int rowsCount = SqlCommon.DEFAULT_PAGE_SIZE * 3;
+
+        stmt.execute(format("SELECT * FROM SYSTEM_RANGE(0, {});"
+                        + "SELECT * FROM SYSTEM_RANGE(0, {}); "
+                        + "SELECT * FROM SYSTEM_RANGE(0, {});",
+                rowsCount, rowsCount, rowsCount));
+
+        stmt.close();
+
+        Awaitility.await().timeout(5, TimeUnit.SECONDS).until(() -> 
openResources(CLUSTER), is(0));
+        Awaitility.await().timeout(5, TimeUnit.SECONDS).until(() -> 
openCursors(CLUSTER), is(0));
+    }
+
     @Test
     public void testMixedDmlQueryExecute() throws Exception {
         boolean res = stmt.execute("INSERT INTO TEST_TX VALUES (6, 5, '5'); 
DELETE FROM TEST_TX WHERE ID=6; SELECT 1;");
diff --git 
a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc2/ResultSetWrapper.java
 
b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc2/ResultSetWrapper.java
index fc600917da9..e39c1fb5a33 100644
--- 
a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc2/ResultSetWrapper.java
+++ 
b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc2/ResultSetWrapper.java
@@ -74,26 +74,24 @@ final class ResultSetWrapper {
     }
 
     void close() throws SQLException {
-        JdbcResultSet current = resultSet;
-        if (current == null || current.closed) {
+        JdbcResultSet rs = resultSet;
+        if (rs == null || rs.closed) {
             return;
         }
-        resultSet = null;
 
-        JdbcResultSet rs = current;
+        resultSet = null;
 
         do {
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-26789 Close 
properly.
-            // w/o iteration over all cursors, the cursors that are not 
retrieved hung in the void
-            // and are never released.
+            // w/o iteration over all cursors, the cursors that are
+            // not retrieved hung in the void and are never released.
             try {
+                rs.close();
+
                 rs = rs.tryNextResultSet();
             } catch (SQLException ignore) {
                 // This is an execution related exception, ignore it. 
                 break;
             }
         } while (rs != null);
-
-        current.close();
     }
 }

Reply via email to