This is an automated email from the ASF dual-hosted git repository.

jooger pushed a commit to branch jdbc_over_thin_sql
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 3d2a8ebaa749ab0bf0b3729e8ce3ab810bd8cd27
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Wed Aug 20 09:41:30 2025 +0300

    IGNITE-26087 Ability to obtain results of a multi-statement query execution 
using the internal thin client SQL API. (#6397)
---
 .../ignite/internal/client/proto/ClientOp.java     |   3 +
 .../ignite/internal/client/sql/QueryModifier.java  |   7 +-
 .../client/proto/sql/QueryModifierTest.java        |   5 +-
 .../handler/ClientInboundMessageHandler.java       |   4 +
 .../client/handler/ClientResourceRegistry.java     |   6 +
 .../handler/requests/sql/ClientSqlCommon.java      | 152 ++++++
 .../sql/ClientSqlCursorNextResultRequest.java      |  64 +++
 .../requests/sql/ClientSqlExecuteRequest.java      | 134 ++----
 .../handler/requests/sql/ClientSqlProperties.java  |   2 +-
 .../handler/requests/sql/ClientSqlCommonTest.java  |   2 +-
 .../apache/ignite/client/ClientOperationType.java  |   5 +
 .../org/apache/ignite/client/RetryReadPolicy.java  |   1 +
 .../apache/ignite/internal/client/ClientUtils.java |   3 +
 .../internal/client/sql/ClientAsyncResultSet.java  |  68 ++-
 .../ignite/internal/client/sql/ClientSql.java      |   6 +-
 .../org/apache/ignite/client/ClientSqlTest.java    |   4 +
 .../org/apache/ignite/client/fakes/FakeCursor.java |   5 +-
 .../client/ItThinClientMultistatementSqlTest.java  | 529 +++++++++++++++++++++
 .../runner/app/client/ItThinClientSqlTest.java     |  21 +-
 .../internal/sql/api/AsyncResultSetImpl.java       |   5 +
 .../engine/util/SqlExceptionMapperProvider.java    |   4 +
 21 files changed, 916 insertions(+), 114 deletions(-)

diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
index d0a3f5bb0b5..377745e3f08 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
@@ -200,6 +200,9 @@ public class ClientOp {
     /** Response to a server->client operation. */
     public static final int SERVER_OP_RESPONSE = 73;
 
+    /** Get next result set. */
+    public static final int SQL_CURSOR_NEXT_RESULT_SET = 74;
+
     /** Reserved for extensions: min. */
     @SuppressWarnings("unused")
     public static final int RESERVED_EXTENSION_RANGE_START = 1000;
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/sql/QueryModifier.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/sql/QueryModifier.java
index 60bfcc28e74..d910549559c 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/sql/QueryModifier.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/sql/QueryModifier.java
@@ -34,13 +34,16 @@ public enum QueryModifier {
     ALLOW_APPLIED_RESULT(2),
 
     /** Queries with transaction control statements. */
-    ALLOW_TX_CONTROL(3);
+    ALLOW_TX_CONTROL(3),
+
+    /** Queries with multiple statements. */
+    ALLOW_MULTISTATEMENT(4);
 
     /** A set containing all modifiers. **/
     public static final Set<QueryModifier> ALL = 
EnumSet.allOf(QueryModifier.class);
 
     /** A set of modifiers that can apply to single statements. **/
-    public static final Set<QueryModifier> SINGLE_STMT_MODIFIERS = 
EnumSet.complementOf(EnumSet.of(ALLOW_TX_CONTROL));
+    public static final Set<QueryModifier> SINGLE_STMT_MODIFIERS = 
EnumSet.complementOf(EnumSet.of(ALLOW_TX_CONTROL, ALLOW_MULTISTATEMENT));
 
     private static final QueryModifier[] VALS = new 
QueryModifier[values().length];
 
diff --git 
a/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/sql/QueryModifierTest.java
 
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/sql/QueryModifierTest.java
index 443f604e5d3..a4821d85333 100644
--- 
a/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/sql/QueryModifierTest.java
+++ 
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/sql/QueryModifierTest.java
@@ -58,7 +58,10 @@ public class QueryModifierTest {
                 Arguments.of(Set.of(QueryModifier.ALLOW_ROW_SET_RESULT, 
QueryModifier.ALLOW_AFFECTED_ROWS_RESULT)),
                 Arguments.of(Set.of(QueryModifier.ALLOW_AFFECTED_ROWS_RESULT, 
QueryModifier.ALLOW_APPLIED_RESULT)),
                 Arguments.of(Set.of(QueryModifier.ALLOW_APPLIED_RESULT, 
QueryModifier.ALLOW_TX_CONTROL)),
-                Arguments.of(Set.of(QueryModifier.ALLOW_TX_CONTROL, 
QueryModifier.ALLOW_ROW_SET_RESULT)),
+                Arguments.of(Set.of(QueryModifier.ALLOW_TX_CONTROL, 
QueryModifier.ALLOW_MULTISTATEMENT)),
+                Arguments.of(Set.of(QueryModifier.ALLOW_MULTISTATEMENT, 
QueryModifier.ALLOW_ROW_SET_RESULT)),
+                Arguments.of(Set.of(QueryModifier.ALLOW_ROW_SET_RESULT, 
QueryModifier.ALLOW_MULTISTATEMENT,
+                        QueryModifier.ALLOW_APPLIED_RESULT)),
                 Arguments.of(QueryModifier.ALL));
     }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index b4e90eb9b46..98ee804a10e 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -80,6 +80,7 @@ import 
org.apache.ignite.client.handler.requests.jdbc.ClientJdbcTableMetadataReq
 import org.apache.ignite.client.handler.requests.jdbc.JdbcMetadataCatalog;
 import 
org.apache.ignite.client.handler.requests.sql.ClientSqlCursorCloseRequest;
 import 
org.apache.ignite.client.handler.requests.sql.ClientSqlCursorNextPageRequest;
+import 
org.apache.ignite.client.handler.requests.sql.ClientSqlCursorNextResultRequest;
 import 
org.apache.ignite.client.handler.requests.sql.ClientSqlExecuteBatchRequest;
 import org.apache.ignite.client.handler.requests.sql.ClientSqlExecuteRequest;
 import 
org.apache.ignite.client.handler.requests.sql.ClientSqlExecuteScriptRequest;
@@ -959,6 +960,9 @@ public class ClientInboundMessageHandler
                         clientContext.hasFeature(SQL_MULTISTATEMENT_SUPPORT)
                 );
 
+            case ClientOp.SQL_CURSOR_NEXT_RESULT_SET:
+                return ClientSqlCursorNextResultRequest.process(in, resources, 
partitionOperationsExecutor, metrics);
+
             case ClientOp.OPERATION_CANCEL:
                 return ClientOperationCancelRequest.process(in, cancelHandles);
 
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
index ed418d70a2a..23fc40cc33a 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Per-connection resource registry.
@@ -140,6 +141,11 @@ public class ClientResourceRegistry {
         }
     }
 
+    @TestOnly
+    public int size() {
+        return res.size();
+    }
+
     /**
      * Enters the lock.
      */
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
index dc251f2bf58..ff3d3e7f6a7 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
@@ -23,15 +23,28 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.handler.ClientHandlerMetricSource;
+import org.apache.ignite.client.handler.ClientResource;
+import org.apache.ignite.client.handler.ClientResourceRegistry;
+import org.apache.ignite.client.handler.ResponseWriter;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.client.sql.QueryModifier;
+import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.sql.api.AsyncResultSetImpl;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.InternalSqlRow;
 import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import 
org.apache.ignite.internal.sql.engine.prepare.partitionawareness.PartitionAwarenessMetadata;
+import org.apache.ignite.internal.util.AsyncCursor;
 import org.apache.ignite.sql.ColumnMetadata;
 import org.apache.ignite.sql.ColumnMetadata.ColumnOrigin;
 import org.apache.ignite.sql.ResultSetMetadata;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.sql.async.AsyncResultSet;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Common SQL request handling logic.
@@ -223,6 +236,9 @@ class ClientSqlCommon {
                     queryTypes.add(SqlQueryType.TX_CONTROL);
                     break;
 
+                case ALLOW_MULTISTATEMENT:
+                    break;
+
                 default:
                     throw new IllegalArgumentException("Unexpected modifier " 
+ queryModifier);
             }
@@ -230,4 +246,140 @@ class ClientSqlCommon {
 
         return queryTypes;
     }
+
+    static CompletableFuture<ResponseWriter> writeResultSetAsync(
+            ClientResourceRegistry resources,
+            AsyncResultSetImpl asyncResultSet,
+            ClientHandlerMetricSource metrics,
+            int pageSize,
+            boolean includePartitionAwarenessMeta,
+            boolean sqlDirectTxMappingSupported,
+            boolean sqlMultiStatementSupported
+    ) {
+        try {
+            Long nextResultResourceId = sqlMultiStatementSupported && 
asyncResultSet.cursor().hasNextResult()
+                    ? 
saveNextResultResource(asyncResultSet.cursor().nextResult(), pageSize, 
resources)
+                    : null;
+
+            if ((asyncResultSet.hasRowSet() && asyncResultSet.hasMorePages())) 
{
+                metrics.cursorsActiveIncrement();
+
+                var clientResultSet = new ClientSqlResultSet(asyncResultSet, 
metrics);
+
+                ClientResource resource = new ClientResource(
+                        clientResultSet,
+                        clientResultSet::closeAsync);
+
+                var resourceId = resources.put(resource);
+
+                return CompletableFuture.completedFuture(out ->
+                        writeResultSet(out, asyncResultSet, resourceId, 
includePartitionAwarenessMeta,
+                                sqlDirectTxMappingSupported, 
sqlMultiStatementSupported, nextResultResourceId));
+            }
+
+            return asyncResultSet.closeAsync()
+                    .thenApply(v -> (ResponseWriter) out ->
+                            writeResultSet(out, asyncResultSet, null, 
includePartitionAwarenessMeta,
+                                    sqlDirectTxMappingSupported, 
sqlMultiStatementSupported, nextResultResourceId));
+
+        } catch (IgniteInternalCheckedException e) {
+            // Resource registry was closed.
+            return asyncResultSet
+                    .closeAsync()
+                    .thenRun(() -> {
+                        throw new IgniteInternalException(e.getMessage(), e);
+                    });
+        }
+    }
+
+    private static Long saveNextResultResource(
+            CompletableFuture<AsyncSqlCursor<InternalSqlRow>> nextResultFuture,
+            int pageSize,
+            ClientResourceRegistry resources
+    ) throws IgniteInternalCheckedException {
+        ClientResource resource = new ClientResource(
+                new CursorWithPageSize(nextResultFuture, pageSize),
+                () -> nextResultFuture.thenApply(AsyncCursor::closeAsync));
+
+        return resources.put(resource);
+    }
+
+    private static void writeResultSet(
+            ClientMessagePacker out,
+            AsyncResultSetImpl res,
+            @Nullable Long resourceId,
+            boolean includePartitionAwarenessMeta,
+            boolean sqlDirectTxMappingSupported,
+            boolean sqlMultiStatementsSupported,
+            @Nullable Long nextResultResourceId
+    ) {
+        out.packLongNullable(resourceId);
+
+        out.packBoolean(res.hasRowSet());
+        out.packBoolean(res.hasMorePages());
+        out.packBoolean(res.wasApplied());
+        out.packLong(res.affectedRows());
+
+        packMeta(out, res.metadata());
+
+        if (includePartitionAwarenessMeta) {
+            packPartitionAwarenessMeta(out, res.partitionAwarenessMetadata(), 
sqlDirectTxMappingSupported);
+        }
+
+        if (sqlMultiStatementsSupported) {
+            out.packLongNullable(nextResultResourceId);
+        }
+
+        if (res.hasRowSet()) {
+            packCurrentPage(out, res);
+        }
+    }
+
+    private static void packMeta(ClientMessagePacker out, @Nullable 
ResultSetMetadata meta) {
+        // TODO IGNITE-17179 metadata caching - avoid sending same meta over 
and over.
+        if (meta == null || meta.columns() == null) {
+            out.packInt(0);
+            return;
+        }
+
+        packColumns(out, meta.columns());
+    }
+
+    private static void packPartitionAwarenessMeta(
+            ClientMessagePacker out,
+            @Nullable PartitionAwarenessMetadata meta,
+            boolean sqlDirectTxMappingSupported
+    ) {
+        if (meta == null) {
+            out.packNil();
+            return;
+        }
+
+        out.packInt(meta.tableId());
+        out.packIntArray(meta.indexes());
+        out.packIntArray(meta.hash());
+
+        if (sqlDirectTxMappingSupported) {
+            out.packByte(meta.directTxMode().id);
+        }
+    }
+
+    /** Holder of the cursor future and page size. */
+    static class CursorWithPageSize {
+        private final CompletableFuture<AsyncSqlCursor<InternalSqlRow>> 
cursorFuture;
+        private final int pageSize;
+
+        CursorWithPageSize(CompletableFuture<AsyncSqlCursor<InternalSqlRow>> 
cursorFuture, int pageSize) {
+            this.cursorFuture = cursorFuture;
+            this.pageSize = pageSize;
+        }
+
+        CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture() {
+            return cursorFuture;
+        }
+
+        int pageSize() {
+            return pageSize;
+        }
+    }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java
new file mode 100644
index 00000000000..5e8752fae65
--- /dev/null
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.sql;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import org.apache.ignite.client.handler.ClientHandlerMetricSource;
+import org.apache.ignite.client.handler.ClientResource;
+import org.apache.ignite.client.handler.ClientResourceRegistry;
+import org.apache.ignite.client.handler.ResponseWriter;
+import 
org.apache.ignite.client.handler.requests.sql.ClientSqlCommon.CursorWithPageSize;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
+import org.apache.ignite.internal.sql.api.AsyncResultSetImpl;
+import org.apache.ignite.sql.SqlRow;
+
+/**
+ * Client SQL cursor next result.
+ */
+public class ClientSqlCursorNextResultRequest {
+    /**
+     * Processes the request.
+     *
+     * @param in Unpacker.
+     * @return Future representing result of operation.
+     */
+    public static CompletableFuture<ResponseWriter> process(
+            ClientMessageUnpacker in,
+            ClientResourceRegistry resources,
+            Executor operationExecutor,
+            ClientHandlerMetricSource metrics
+    ) throws IgniteInternalCheckedException {
+        long resourceId = in.unpackLong();
+        ClientResource resource = resources.remove(resourceId);
+        CursorWithPageSize cursorWithPageSize = 
resource.get(CursorWithPageSize.class);
+        int pageSize = cursorWithPageSize.pageSize();
+
+        return cursorWithPageSize.cursorFuture()
+                .thenComposeAsync(cur -> cur.requestNextAsync(pageSize)
+                        .thenApply(batchRes -> new AsyncResultSetImpl<SqlRow>(
+                                        cur,
+                                        batchRes,
+                                        pageSize
+                                )
+                        ).thenCompose(asyncResultSet ->
+                                ClientSqlCommon.writeResultSetAsync(resources, 
asyncResultSet, metrics, pageSize, false, false, true)
+                        ).thenApply(rsWriter -> rsWriter), operationExecutor);
+    }
+}
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index 89bfbd7bc07..758bc01a494 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -17,39 +17,38 @@
 
 package org.apache.ignite.client.handler.requests.sql;
 
-import static 
org.apache.ignite.client.handler.requests.sql.ClientSqlCommon.packCurrentPage;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
 import static 
org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException;
+import static org.apache.ignite.internal.util.CompletableFutures.allOf;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
+import java.util.function.Function;
 import org.apache.ignite.client.handler.ClientHandlerMetricSource;
-import org.apache.ignite.client.handler.ClientResource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
 import org.apache.ignite.client.handler.NotificationSender;
 import org.apache.ignite.client.handler.ResponseWriter;
-import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.HybridTimestampTracker;
-import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
-import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.sql.api.AsyncResultSetImpl;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.InternalSqlRow;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
 import org.apache.ignite.internal.sql.engine.SqlProperties;
-import 
org.apache.ignite.internal.sql.engine.prepare.partitionawareness.PartitionAwarenessMetadata;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.CancelHandle;
 import org.apache.ignite.lang.CancellationToken;
-import org.apache.ignite.sql.ResultSetMetadata;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
@@ -57,7 +56,6 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Client SQL execute request.
  */
-@SuppressWarnings({"rawtypes", "unchecked"})
 public class ClientSqlExecuteRequest {
     /**
      * Processes the request.
@@ -128,7 +126,8 @@ public class ClientSqlExecuteRequest {
                 () -> cancelHandles.remove(requestId),
                 arguments
         ).thenCompose(asyncResultSet ->
-                        writeResultSetAsync(resources, asyncResultSet, 
metrics, includePartitionAwarenessMeta, sqlDirectTxMappingSupported))
+                        ClientSqlCommon.writeResultSetAsync(resources, 
asyncResultSet, metrics, props.pageSize(),
+                                includePartitionAwarenessMeta, 
sqlDirectTxMappingSupported, sqlMultistatementsSupported))
                 .thenApply(rsWriter -> out -> {
                     if (tx != null) {
                         writeTxMeta(out, timestampTracker, clockService, tx, 
resIdHolder[0]);
@@ -146,95 +145,6 @@ public class ClientSqlExecuteRequest {
         return arguments == null ? ArrayUtils.OBJECT_EMPTY_ARRAY : arguments;
     }
 
-    private static CompletableFuture<ResponseWriter> writeResultSetAsync(
-            ClientResourceRegistry resources,
-            AsyncResultSetImpl asyncResultSet,
-            ClientHandlerMetricSource metrics,
-            boolean includePartitionAwarenessMeta,
-            boolean sqlDirectTxMappingSupported
-    ) {
-        if (asyncResultSet.hasRowSet() && asyncResultSet.hasMorePages()) {
-            try {
-                metrics.cursorsActiveIncrement();
-
-                var clientResultSet = new ClientSqlResultSet(asyncResultSet, 
metrics);
-
-                ClientResource resource = new ClientResource(
-                        clientResultSet,
-                        clientResultSet::closeAsync);
-
-                var resourceId = resources.put(resource);
-
-                return CompletableFuture.completedFuture(out ->
-                        writeResultSet(out, asyncResultSet, resourceId, 
includePartitionAwarenessMeta, sqlDirectTxMappingSupported));
-            } catch (IgniteInternalCheckedException e) {
-                return asyncResultSet
-                        .closeAsync()
-                        .thenRun(() -> {
-                            throw new IgniteInternalException(e.getMessage(), 
e);
-                        });
-            }
-        }
-
-        return asyncResultSet.closeAsync()
-                .thenApply(v -> (ResponseWriter) out ->
-                        writeResultSet(out, asyncResultSet, null, 
includePartitionAwarenessMeta, sqlDirectTxMappingSupported));
-    }
-
-    private static void writeResultSet(
-            ClientMessagePacker out,
-            AsyncResultSetImpl res,
-            @Nullable Long resourceId,
-            boolean includePartitionAwarenessMeta,
-            boolean sqlDirectTxMappingSupported
-    ) {
-        out.packLongNullable(resourceId);
-
-        out.packBoolean(res.hasRowSet());
-        out.packBoolean(res.hasMorePages());
-        out.packBoolean(res.wasApplied());
-        out.packLong(res.affectedRows());
-
-        packMeta(out, res.metadata());
-
-        if (includePartitionAwarenessMeta) {
-            packPartitionAwarenessMeta(out, res.partitionAwarenessMetadata(), 
sqlDirectTxMappingSupported);
-        }
-
-        if (res.hasRowSet()) {
-            packCurrentPage(out, res);
-        }
-    }
-
-    private static void packMeta(ClientMessagePacker out, @Nullable 
ResultSetMetadata meta) {
-        // TODO IGNITE-17179 metadata caching - avoid sending same meta over 
and over.
-        if (meta == null || meta.columns() == null) {
-            out.packInt(0);
-            return;
-        }
-
-        ClientSqlCommon.packColumns(out, meta.columns());
-    }
-
-    private static void packPartitionAwarenessMeta(
-            ClientMessagePacker out,
-            @Nullable PartitionAwarenessMetadata meta,
-            boolean sqlDirectTxMappingSupported
-    ) {
-        if (meta == null) {
-            out.packNil();
-            return;
-        }
-
-        out.packInt(meta.tableId());
-        out.packIntArray(meta.indexes());
-        out.packIntArray(meta.hash());
-
-        if (sqlDirectTxMappingSupported) {
-            out.packByte(meta.directTxMode().id);
-        }
-    }
-
     private static CompletableFuture<AsyncResultSetImpl<SqlRow>> executeAsync(
             @Nullable Transaction transaction,
             QueryProcessor qryProc,
@@ -256,7 +166,7 @@ public class ClientSqlExecuteRequest {
                         arguments
                     )
                     .thenCompose(cur -> {
-                                cur.onClose().whenComplete((none, ignore) -> 
onComplete.run());
+                                doWhenAllCursorsComplete(cur, onComplete);
 
                                 return cur.requestNextAsync(pageSize)
                                         .thenApply(
@@ -280,4 +190,30 @@ public class ClientSqlExecuteRequest {
             return CompletableFuture.failedFuture(mapToPublicSqlException(e));
         }
     }
+
+    private static void 
doWhenAllCursorsComplete(AsyncSqlCursor<InternalSqlRow> cursor, Runnable 
action) {
+        List<CompletableFuture<?>> dependency = new ArrayList<>();
+        var cursorChainTraverser = new Function<AsyncSqlCursor<?>, 
CompletableFuture<AsyncSqlCursor<?>>>() {
+            @Override
+            public CompletableFuture<AsyncSqlCursor<?>> 
apply(AsyncSqlCursor<?> cursor) {
+                dependency.add(cursor.onClose());
+
+                if (cursor.hasNextResult()) {
+                    return cursor.nextResult().thenCompose(this);
+                }
+
+                return allOf(dependency)
+                        .thenRun(action)
+                        .thenApply(ignored -> cursor);
+            }
+        };
+
+        cursorChainTraverser
+                .apply(cursor)
+                .exceptionally(ex -> {
+                    action.run();
+
+                    return null;
+                });
+    }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlProperties.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlProperties.java
index 2931dd315eb..23f010e94ad 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlProperties.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlProperties.java
@@ -75,7 +75,7 @@ class ClientSqlProperties {
         SqlProperties sqlProperties = new SqlProperties()
                 .queryTimeout(queryTimeout)
                 
.allowedQueryTypes(ClientSqlCommon.convertQueryModifierToQueryType(queryModifiers))
-                .allowMultiStatement(false);
+                
.allowMultiStatement(queryModifiers.contains(QueryModifier.ALLOW_MULTISTATEMENT));
 
         if (schema != null) {
             sqlProperties.defaultSchema(schema);
diff --git 
a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommonTest.java
 
b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommonTest.java
index da7041d4baa..c18556a5135 100644
--- 
a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommonTest.java
+++ 
b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommonTest.java
@@ -43,7 +43,7 @@ public class ClientSqlCommonTest {
     void testConvertQueryModifierToQueryType(QueryModifier type) {
         Set<SqlQueryType> sqlQueryTypes = 
ClientSqlCommon.convertQueryModifierToQueryType(Set.of(type));
 
-        assertFalse(sqlQueryTypes.isEmpty());
+        assertThat(sqlQueryTypes.isEmpty(), is(type == 
QueryModifier.ALLOW_MULTISTATEMENT));
 
         sqlQueryTypes.forEach(sqlQueryType -> {
             switch (type) {
diff --git 
a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
 
b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
index b2a2d2f0820..f0a9f13f20a 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
@@ -175,6 +175,11 @@ public enum ClientOperationType {
      */
     SQL_CURSOR_NEXT_PAGE,
 
+    /**
+     * SQL Cursor Next ResultSet.
+     */
+    SQL_CURSOR_NEXT_RESULT_SET,
+
     /**
      * Send streamer batch ({@link DataStreamerTarget#streamData}).
      */
diff --git 
a/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java 
b/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
index d6c20a7855c..44e0373106a 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
@@ -59,6 +59,7 @@ public class RetryReadPolicy extends RetryLimitPolicy {
             case SQL_EXECUTE:
             case SQL_EXECUTE_BATCH:
             case SQL_CURSOR_NEXT_PAGE:
+            case SQL_CURSOR_NEXT_RESULT_SET:
             case SQL_EXECUTE_SCRIPT:
             case STREAMER_BATCH_SEND:
             case PRIMARY_REPLICAS_GET:
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
index ad969d2d3d5..f8661224bc1 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
@@ -175,6 +175,9 @@ public class ClientUtils {
             case ClientOp.SQL_CURSOR_NEXT_PAGE:
                 return ClientOperationType.SQL_CURSOR_NEXT_PAGE;
 
+            case ClientOp.SQL_CURSOR_NEXT_RESULT_SET:
+                return ClientOperationType.SQL_CURSOR_NEXT_RESULT_SET;
+
             case ClientOp.SQL_CURSOR_CLOSE:
                 return null;
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
index 82f1efaba4b..97d1cfd09fe 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
@@ -22,7 +22,9 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
 import org.apache.ignite.internal.client.ClientChannel;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
@@ -46,7 +48,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Client async result set.
  */
-class ClientAsyncResultSet<T> implements AsyncResultSet<T> {
+public class ClientAsyncResultSet<T> implements AsyncResultSet<T> {
     /** Channel. */
     private final ClientChannel ch;
 
@@ -84,6 +86,17 @@ class ClientAsyncResultSet<T> implements AsyncResultSet<T> {
     /** Closed flag. */
     private volatile boolean closed;
 
+    /** ID of the resource that holds the next cursor, can be {@code null} if 
current result set is the last one. */
+    @Nullable
+    private final Long nextResultResourceId;
+
+    /** Future that holds the next result set, can be {@code null} if the 
current result set is the last one. */
+    @Nullable
+    private final CompletableFuture<ClientAsyncResultSet<T>> nextResultFuture;
+
+    /** A flag indicating whether the next result set already was requested or 
not. */
+    private final AtomicBoolean nextResultSetRetrieved = new AtomicBoolean();
+
     /**
      * Constructor.
      *
@@ -93,6 +106,7 @@ class ClientAsyncResultSet<T> implements AsyncResultSet<T> {
      * @param mapper Mapper.
      * @param partitionAwarenessEnabled Whether partitions awareness is 
enabled, hence response may contain related metadata.
      * @param sqlDirectMappingSupported Whether direct mapping is supported, 
hence response may contain additional metadata.
+     * @param sqlMultiStatementsSupported Whether iteration over the results 
of script execution is supported.
      */
     ClientAsyncResultSet(
             ClientChannel ch,
@@ -100,7 +114,8 @@ class ClientAsyncResultSet<T> implements AsyncResultSet<T> {
             ClientMessageUnpacker in,
             @Nullable Mapper<T> mapper,
             boolean partitionAwarenessEnabled,
-            boolean sqlDirectMappingSupported
+            boolean sqlDirectMappingSupported,
+            boolean sqlMultiStatementsSupported
     ) {
         this.ch = ch;
 
@@ -117,6 +132,14 @@ class ClientAsyncResultSet<T> implements AsyncResultSet<T> 
{
             partitionAwarenessMetadata = null;
         }
 
+        if (sqlMultiStatementsSupported && !in.tryUnpackNil()) {
+            nextResultResourceId = in.unpackLong();
+            nextResultFuture = new CompletableFuture<>();
+        } else {
+            nextResultResourceId = null;
+            nextResultFuture = null;
+        }
+
         this.mapper = mapper;
         marshaller = metadata != null && mapper != null && mapper.targetType() 
!= SqlRow.class
                 ? marshaller(metadata, marshallers, mapper)
@@ -139,6 +162,47 @@ class ClientAsyncResultSet<T> implements AsyncResultSet<T> 
{
         return hasRowSet;
     }
 
+    /**
+     * Returns flag indicating whether the current result set is the result of
+     * a multi-statement query and this statement is not the last one.
+     */
+    public boolean hasNextResultSet() {
+        return nextResultResourceId != null;
+    }
+
+    /**
+     * Retrieves the next result set of a multi-statement query.
+     *
+     * @return Next result set.
+     * @throws NoSuchElementException if the query has no more statements to 
execute.
+     */
+    public CompletableFuture<ClientAsyncResultSet<T>> nextResultSet() {
+        if (nextResultResourceId == null) {
+            return CompletableFuture.failedFuture(new 
NoSuchElementException("Query has no more results"));
+        }
+
+        assert nextResultFuture != null;
+
+        if (!nextResultSetRetrieved.compareAndSet(false, true)) {
+            return nextResultFuture;
+        }
+
+        
ch.<ClientAsyncResultSet<T>>serviceAsync(ClientOp.SQL_CURSOR_NEXT_RESULT_SET,
+                        w -> w.out().packLong(nextResultResourceId),
+                        r -> new ClientAsyncResultSet<>(
+                                r.clientChannel(), null, r.in(), null, false, 
false, true
+                        ))
+                .whenComplete((r, e) -> {
+                    if (e != null) {
+                        nextResultFuture.completeExceptionally(e);
+                    } else {
+                        nextResultFuture.complete(r);
+                    }
+                });
+
+        return nextResultFuture;
+    }
+
     /** {@inheritDoc} */
     @Override
     public long affectedRows() {
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
index 9d9e01f875b..568c0df0300 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
@@ -321,6 +321,9 @@ public class ClientSql implements IgniteSql {
             Statement statement,
             @Nullable Object... arguments
     ) {
+        assert mapper == null || mapper.targetType() == SqlRow.class
+                || 
!queryModifiers.contains(QueryModifier.ALLOW_MULTISTATEMENT) : "Mapper is not 
supported for multi-statements.";
+
         Objects.requireNonNull(statement);
 
         PartitionMappingProvider mappingProvider = 
mappingProviderCache.getIfPresent(new PaCacheKey(statement));
@@ -373,10 +376,11 @@ public class ClientSql implements IgniteSql {
                     && 
r.clientChannel().protocolContext().isFeatureSupported(SQL_PARTITION_AWARENESS);
 
             boolean sqlDirectMappingSupported = 
r.clientChannel().protocolContext().isFeatureSupported(SQL_DIRECT_TX_MAPPING);
+            boolean sqlMultistatementsSupported = 
r.clientChannel().protocolContext().allFeaturesSupported(SQL_MULTISTATEMENT_SUPPORT);
 
             DirectTxUtils.readTx(r, ctx, tx, ch.observableTimestamp());
             ClientAsyncResultSet<T> rs = new ClientAsyncResultSet<>(
-                    r.clientChannel(), marshallers, r.in(), mapper, 
tryUnpackPaMeta, sqlDirectMappingSupported
+                    r.clientChannel(), marshallers, r.in(), mapper, 
tryUnpackPaMeta, sqlDirectMappingSupported, sqlMultistatementsSupported
             );
 
             ClientPartitionAwarenessMetadata partitionAwarenessMetadata = 
rs.partitionAwarenessMetadata();
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
index 682cab4bfdc..8411c929f86 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
@@ -307,6 +307,10 @@ public class ClientSqlTest extends AbstractClientTableTest 
{
                     expected = "TX_CONTROL";
                     break;
 
+                case ALLOW_MULTISTATEMENT:
+                    expected = "MULTISTATEMENT";
+                    break;
+
                 default:
                     throw new IllegalArgumentException("Unexpected type: " + 
modifier);
             }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
index 51759bb99ce..9372dc85e0d 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
 import org.apache.ignite.internal.sql.engine.InternalSqlRow;
 import org.apache.ignite.internal.sql.engine.SqlProperties;
@@ -121,8 +122,8 @@ public class FakeCursor implements 
AsyncSqlCursor<InternalSqlRow> {
             columns.add(new FakeColumnMetadata("col1", ColumnType.INT32));
         } else if ("SELECT ALLOWED QUERY TYPES".equals(qry)) {
             paMeta = null;
-            String row = 
properties.allowedQueryTypes().stream().map(SqlQueryType::name).sorted()
-                    .collect(Collectors.joining(", "));
+            String row = 
Stream.concat(properties.allowedQueryTypes().stream().map(SqlQueryType::name).sorted(),
+                    properties.allowMultiStatement() ? 
Stream.of("MULTISTATEMENT") : Stream.empty()).collect(Collectors.joining(", "));
             rows.add(getRow(row));
             columns.add(new FakeColumnMetadata("col1", ColumnType.STRING));
         } else {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientMultistatementSqlTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientMultistatementSqlTest.java
new file mode 100644
index 00000000000..bfb3ab34645
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientMultistatementSqlTest.java
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app.client;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
+import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.expectQueryCancelled;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.apache.ignite.lang.ErrorGroups.Sql.RUNTIME_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR;
+import static 
org.apache.ignite.lang.ErrorGroups.Sql.TX_CONTROL_INSIDE_EXTERNAL_TX_ERR;
+import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.client.handler.ClientResourceRegistry;
+import org.apache.ignite.internal.client.sql.ClientAsyncResultSet;
+import org.apache.ignite.internal.client.sql.ClientSql;
+import org.apache.ignite.internal.client.sql.QueryModifier;
+import org.apache.ignite.internal.sql.SyncResultSetAdapter;
+import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.lang.CancelHandle;
+import org.apache.ignite.lang.CancellationToken;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.tx.Transaction;
+import org.awaitility.Awaitility;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Thin client SQL multi-statement integration test.
+ *
+ * <p>Tests to check internal API for reading script execution results.
+ */
+@SuppressWarnings("resource")
+public class ItThinClientMultistatementSqlTest extends 
ItAbstractThinClientTest {
+    private final List<ClientAsyncResultSet<SqlRow>> resultsToClose = new 
ArrayList<>();
+
+    private int resourcesBefore;
+
+    @BeforeEach
+    void setup() {
+        resultsToClose.forEach(resultSet -> await(resultSet.closeAsync()));
+
+        resourcesBefore = countResources();
+    }
+
+    @AfterEach
+    protected void checkNoPendingTransactionsAndOpenedCursors() {
+        Awaitility.await().timeout(5, TimeUnit.SECONDS).untilAsserted(() -> {
+            for (int i = 0; i < nodes(); i++) {
+                assertThat("node=" + i, queryProcessor(i).openedCursors(), 
is(0));
+            }
+
+            for (int i = 0; i < nodes(); i++) {
+                assertThat("node=" + i, txManager(i).pending(), is(0));
+            }
+
+            assertThat(countResources() - resourcesBefore, is(0));
+
+            for (int i = 0; i < nodes(); i++) {
+                int cancelHandlesCount = 
unwrapIgniteImpl(server(i)).clientInboundMessageHandler().cancelHandlesCount();
+
+                assertThat("node=" + i, cancelHandlesCount, is(0));
+            }
+        });
+
+        String dropTablesScript = client().tables().tables().stream()
+                .map(Table::name)
+                .map(name -> "DROP TABLE " + name)
+                .collect(Collectors.joining(";\n"));
+
+        if (!dropTablesScript.isEmpty()) {
+            client().sql().executeScript(dropTablesScript);
+        }
+    }
+
+    /** Ensures that we can get the next result set after the current one is 
closed. */
+    @Test
+    void checkIterationOverResultSets() {
+        ClientAsyncResultSet<SqlRow> asyncRs = runSql("SELECT 1; SELECT 2; 
SELECT 3;");
+
+        // First result set.
+        {
+            SyncResultSetAdapter<SqlRow> rs = new 
SyncResultSetAdapter<>(asyncRs);
+            assertThat(rs.hasRowSet(), is(true));
+            assertThat(rs.next().intValue(0), is(1));
+
+            rs.close();
+        }
+
+        assertThat(asyncRs.hasNextResultSet(), is(true));
+
+        CompletableFuture<ClientAsyncResultSet<SqlRow>> nextResultFut = 
asyncRs.nextResultSet();
+        // Ensures that the next result is cached locally
+        // and subsequent calls do not request data from the server.
+        assertThat(nextResultFut, is(asyncRs.nextResultSet()));
+
+        asyncRs = await(asyncRs.nextResultSet());
+
+        // Second result set.
+        {
+            SyncResultSetAdapter<SqlRow> rs = new 
SyncResultSetAdapter<>(asyncRs);
+            assertThat(rs.hasRowSet(), is(true));
+            assertThat(rs.next().intValue(0), is(2));
+
+            rs.close();
+        }
+
+        assertThat(asyncRs.hasNextResultSet(), is(true));
+        // Ensures that the next result is cached locally
+        // and subsequent calls do not request data from the server.
+        assertThat(asyncRs.nextResultSet(), is(asyncRs.nextResultSet()));
+        asyncRs = await(asyncRs.nextResultSet());
+
+        // Second result set.
+        {
+            SyncResultSetAdapter<SqlRow> rs = new 
SyncResultSetAdapter<>(asyncRs);
+            assertThat(rs.hasRowSet(), is(true));
+            assertThat(rs.next().intValue(0), is(3));
+
+            rs.close();
+        }
+
+        assertThat(asyncRs.hasNextResultSet(), is(false));
+    }
+
+    @Test
+    void basicMultiStatementQuery() {
+        String sql = "CREATE TABLE test (id INT PRIMARY KEY, val INT);"
+                + "INSERT INTO test VALUES (3, 3);"
+                + "UPDATE test SET val=7 WHERE id=3;"
+                + "EXPLAIN PLAN FOR SELECT * FROM test;"
+                + "SELECT * FROM test;"
+                + "DELETE FROM test;"
+                + "DROP TABLE IF EXISTS non_existing_table;";
+
+        List<ClientAsyncResultSet<SqlRow>> resultSets = 
fetchAllResults(runSql(sql));
+        Iterator<ClientAsyncResultSet<SqlRow>> curItr = resultSets.iterator();
+
+        ResultValidator.ddl(curItr.next(), true);
+        ResultValidator.dml(curItr.next(), 1);
+        ResultValidator.dml(curItr.next(), 1);
+        assertNotNull(curItr.next()); // skip EXPLAIN.
+        ResultValidator.singleRow(curItr.next(), 3, 7);
+        ResultValidator.dml(curItr.next(), 1);
+        ResultValidator.ddl(curItr.next(), false);
+
+        assertThat(curItr.hasNext(), is(false));
+
+        resultSets.forEach(AsyncResultSet::closeAsync);
+
+        // Ensures that the script is executed completely, even if the cursor 
data has not been read.
+        executeSql("INSERT INTO test VALUES (1, 1);"
+                + "INSERT INTO test VALUES (2, 2);"
+                + "SELECT * FROM test;"
+                + "INSERT INTO test VALUES (3, 3);");
+
+        expectRowsCount(null, "test", 3);
+    }
+
+    @Test
+    public void txControlStatement() {
+        String query = "START TRANSACTION; COMMIT";
+
+        // Execution of the TX_CONTROL statement is allowed.
+        {
+            EnumSet<QueryModifier> modifiers = EnumSet.of(
+                    QueryModifier.ALLOW_TX_CONTROL,
+                    QueryModifier.ALLOW_MULTISTATEMENT
+            );
+
+            ClientAsyncResultSet<SqlRow> startFuture = runSql((Transaction) 
null, null, modifiers, query);
+            List<ClientAsyncResultSet<SqlRow>> resultSets = 
fetchAllResults(startFuture);
+
+            assertThat(resultSets, hasSize(2));
+
+            ClientAsyncResultSet<SqlRow> rs = resultSets.get(0);
+            assertThat(rs.hasNextResultSet(), is(true));
+            assertThat(rs.hasRowSet(), is(false));
+            assertThat(rs.wasApplied(), is(false));
+            assertThat(rs.affectedRows(), is(-1L));
+
+            rs = resultSets.get(1);
+            assertThat(rs.hasNextResultSet(), is(false));
+            assertThat(rs.hasRowSet(), is(false));
+            assertThat(rs.wasApplied(), is(false));
+            assertThat(rs.affectedRows(), is(-1L));
+        }
+
+        // Execution of the TX_CONTROL statement is not allowed.
+        {
+            EnumSet<QueryModifier> modifiers = EnumSet.of(
+                    QueryModifier.ALLOW_ROW_SET_RESULT,
+                    QueryModifier.ALLOW_MULTISTATEMENT
+            );
+
+            assertThrowsSqlException(
+                    STMT_VALIDATION_ERR,
+                    "Invalid SQL statement type.",
+                    () -> runSql((Transaction) null, null, modifiers, query)
+            );
+        }
+    }
+
+    @Test
+    void throwsNoSuchElementExceptionIfNoMoreResults() {
+        ClientAsyncResultSet<SqlRow> resulSet = runSql("SELECT 1");
+
+        assertThat(resulSet.hasNextResultSet(), is(false));
+
+        //noinspection ThrowableNotThrown
+        assertThrows(NoSuchElementException.class, () -> 
await(resulSet.nextResultSet()), "Query has no more results");
+    }
+
+    @Test
+    void queryWithDynamicParameters() {
+        String sql = "CREATE TABLE test (id INT PRIMARY KEY, val VARCHAR 
DEFAULT '3');"
+                + "INSERT INTO test VALUES(?, ?);"
+                + "INSERT INTO test VALUES(?, DEFAULT);"
+                + "INSERT INTO test VALUES(?, ?);";
+
+        executeSql(sql, 0, "1", 2, 4, "5");
+        expectRowsCount(null, "test", 3);
+    }
+
+    @Test
+    void explicitTransaction() {
+        executeSql("CREATE TABLE test (id INT PRIMARY KEY);");
+
+        Transaction tx = client().transactions().begin();
+        executeSql(tx, "INSERT INTO test VALUES (0); INSERT INTO test VALUES 
(1); INSERT INTO test VALUES (2)");
+
+        expectRowsCount(tx, "test", 3);
+        expectRowsCount(null, "test", 0);
+
+        tx.commit();
+
+        expectRowsCount(null, "test", 3);
+    }
+
+    @Test
+    void queryWithIncorrectNumberOfDynamicParametersFailsWithValidationError() 
{
+        String sql = "CREATE TABLE test (id INT PRIMARY KEY, val INT);"
+                + "INSERT INTO test VALUES(?, ?);"
+                + "INSERT INTO test VALUES(?, ?);";
+
+        String expectedMessage = "Unexpected number of query parameters";
+
+        assertThrowsSqlException(STMT_VALIDATION_ERR, expectedMessage, () -> 
runSql(sql, 0));
+        assertThrowsSqlException(STMT_VALIDATION_ERR, expectedMessage, () -> 
runSql(sql, 0, 1, 2, 3, 4, 5));
+    }
+
+    @Test
+    void scriptStopsExecutionOnError() {
+        // Runtime error.
+        {
+            assertThrowsSqlException(
+                    RUNTIME_ERR,
+                    "Division by zero",
+                    () -> executeSql(
+                            "CREATE TABLE test (id INT PRIMARY KEY);"
+                                    + "INSERT INTO test VALUES (2/0);" // 
Runtime error.
+                                    + "INSERT INTO test VALUES (0)"
+                    )
+            );
+
+            expectRowsCount(null, "test", 0);
+        }
+
+        // Validation error.
+        {
+            assertThrowsSqlException(
+                    STMT_VALIDATION_ERR,
+                    "Values passed to VALUES operator must have compatible 
types",
+                    () -> executeSql(
+                            "INSERT INTO test VALUES (?), (?)",
+                            "1", 2
+                    )
+            );
+
+            expectRowsCount(null, "test", 0);
+        }
+
+        // Internal error.
+        {
+            assertThrowsSqlException(
+                    RUNTIME_ERR,
+                    "Subquery returned more than 1 value",
+                    () -> executeSql(
+                            "INSERT INTO test VALUES(0);"
+                                    + "INSERT INTO test VALUES(1);"
+                                    + "DELETE FROM test WHERE id = (SELECT id 
FROM test);" // Internal error.
+                                    + "INSERT INTO test VALUES(2);"
+                    )
+            );
+
+            expectRowsCount(null, "test", 2);
+        }
+
+        // Same as above, but inside script managed transaction.
+        {
+            assertThrowsSqlException(
+                    RUNTIME_ERR,
+                    "Subquery returned more than 1 value",
+                    () -> executeSql(
+                            "START TRANSACTION;"
+                                    + "INSERT INTO test VALUES(2);"
+                                    + "INSERT INTO test VALUES(3);"
+                                    + "DELETE FROM test WHERE id = (SELECT id 
FROM test);" // Internal error.
+                                    + "INSERT INTO test VALUES(4);"
+                                    + "COMMIT;"
+                    )
+            );
+
+            expectRowsCount(null, "test", 2);
+        }
+
+        // Attempt to start script-managed transaction inside external 
transaction.
+        {
+            Transaction tx = client().transactions().begin();
+
+            assertThrowsSqlException(
+                    TX_CONTROL_INSIDE_EXTERNAL_TX_ERR,
+                    "Transaction control statement cannot be executed within 
an external transaction.",
+                    () -> executeSql(
+                            tx,
+                            "START TRANSACTION; COMMIT;"
+                    )
+            );
+
+            tx.rollback();
+        }
+
+        // Internal error due to transaction exception.
+        {
+            Transaction tx = client().transactions().begin();
+            client().sql().execute(tx, "INSERT INTO test VALUES(2);").close();
+            tx.commit();
+
+            assertThrowsSqlException(
+                    TX_ALREADY_FINISHED_ERR,
+                    "Transaction is already finished",
+                    () -> executeSql(
+                            tx,
+                            "INSERT INTO test VALUES(3); INSERT INTO test 
VALUES(4);"
+                    )
+            );
+
+            expectRowsCount(null, "test", 3);
+        }
+    }
+
+    @Test
+    public void cancelScript() {
+        StringBuilder query = new StringBuilder();
+
+        int statementsCount = 100;
+
+        for (int j = 0; j < statementsCount; j++) {
+            query.append("SELECT x FROM TABLE(SYSTEM_RANGE(0, 
100))").append(";");
+        }
+
+        CancelHandle cancelHandle = CancelHandle.create();
+        CancellationToken token = cancelHandle.token();
+
+        List<ClientAsyncResultSet<SqlRow>> allResults = 
fetchAllResults(runSql((Transaction) null, token, null, query.toString()));
+
+        assertThat(allResults, hasSize(statementsCount));
+
+        cancelHandle.cancel();
+
+        allResults.forEach(rs -> {
+            expectQueryCancelled(() -> {
+                AsyncResultSet<SqlRow> res;
+                do {
+                    res = await(rs.fetchNextPage());
+                } while (res.hasMorePages());
+            });
+
+            rs.closeAsync();
+        });
+    }
+
+    private void expectRowsCount(@Nullable Transaction tx, String table, long 
expectedCount) {
+        try (ResultSet<SqlRow> rs = client().sql().execute(tx, "SELECT 
COUNT(*) FROM " + table)) {
+            assertThat(rs.next().longValue(0), is(expectedCount));
+        }
+    }
+
+    private SqlQueryProcessor queryProcessor(int idx) {
+        return ((SqlQueryProcessor) 
unwrapIgniteImpl(server(idx)).queryEngine());
+    }
+
+    private TxManager txManager(int idx) {
+        return unwrapIgniteImpl(server(idx)).txManager();
+    }
+
+    private int countResources() {
+        int count = 0;
+
+        for (int i = 0; i < nodes(); i++) {
+            ClientResourceRegistry resources = 
unwrapIgniteImpl(server(i)).clientInboundMessageHandler().resources();
+
+            count += resources.size();
+        }
+
+        return count;
+    }
+
+    private ClientAsyncResultSet<SqlRow> runSql(String query, Object ... args) 
{
+        return runSql(null, null, null, query, args);
+    }
+
+    private ClientAsyncResultSet<SqlRow> runSql(
+            @Nullable Transaction tx,
+            @Nullable CancellationToken cancelToken,
+            @Nullable Set<QueryModifier> queryModifiers,
+            String query,
+            Object... args
+    ) {
+        ClientSql clientSql = (ClientSql) client().sql();
+        Statement stmt = 
clientSql.statementBuilder().query(query).pageSize(1).build();
+
+        return (ClientAsyncResultSet<SqlRow>) await(
+                clientSql.executeAsyncInternal(
+                        tx,
+                        (Mapper<SqlRow>) null,
+                        cancelToken,
+                        queryModifiers == null ? QueryModifier.ALL : 
queryModifiers,
+                        stmt,
+                        args
+                )
+        );
+    }
+
+    private void executeSql(String sql, Object... args) {
+        executeSql(null, sql, args);
+    }
+
+    private void executeSql(@Nullable Transaction tx, String sql, Object ... 
args) {
+        fetchAllResults(runSql(tx, null, null, sql, args))
+                .forEach(rs -> await(rs.closeAsync()));
+    }
+
+    private List<ClientAsyncResultSet<SqlRow>> 
fetchAllResults(ClientAsyncResultSet<SqlRow> resultSet) {
+        List<ClientAsyncResultSet<SqlRow>> resultSets = new ArrayList<>();
+
+        resultSets.add(resultSet);
+        resultsToClose.add(resultSet);
+
+        ClientAsyncResultSet<SqlRow> resultSet0 = resultSet;
+
+        while (resultSet0.hasNextResultSet()) {
+            resultSet0 = await(resultSet0.nextResultSet());
+
+            assertNotNull(resultSet0);
+
+            resultSets.add(resultSet0);
+
+            resultsToClose.add(resultSet0);
+        }
+
+        return resultSets;
+    }
+
+    private static class ResultValidator {
+        static void dml(ClientAsyncResultSet<SqlRow> resultSet, long 
affectedRows) {
+            assertThat(resultSet.hasRowSet(), is(false));
+            assertThat(resultSet.affectedRows(), is(affectedRows));
+        }
+
+        private static void ddl(ClientAsyncResultSet<SqlRow> resultSet, 
boolean wasApplied) {
+            assertThat(resultSet.hasRowSet(), is(false));
+            assertThat(resultSet.wasApplied(), is(wasApplied));
+        }
+
+        private static void singleRow(ClientAsyncResultSet<SqlRow> resultSet, 
Object... expected) {
+            assertThat(resultSet.hasRowSet(), is(true));
+            Iterator<SqlRow> pageIter = resultSet.currentPage().iterator();
+            SqlRow row = pageIter.next();
+            int rowSize = row.metadata().columns().size();
+            List<Object> actual = new ArrayList<>(rowSize);
+
+            for (int i = 0; i < rowSize; i++) {
+                actual.add(row.value(i));
+            }
+
+            assertThat(List.of(expected), equalTo(actual));
+            assertThat(pageIter.hasNext(), is(false));
+        }
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
index 7f63ace6266..a9ae5856c41 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
@@ -760,11 +760,12 @@ public class ItThinClientSqlTest extends 
ItAbstractThinClientTest {
         Statement ddlStatement = client().sql().createStatement("CREATE TABLE 
x(id INT PRIMARY KEY)");
         Statement dmlStatement = client().sql().createStatement("INSERT INTO x 
VALUES (1), (2), (3)");
         Statement selectStatement = client().sql().createStatement("SELECT * 
FROM x");
+        Statement multiStatement = client().sql().createStatement("SELECT 1; 
SELECT 2;");
 
         BiConsumer<Statement, Set<QueryModifier>> check = (stmt, types) -> {
             await(sql.executeAsyncInternal(
                     null,
-                    () -> SqlRow.class,
+                    null,
                     null,
                     types,
                     stmt
@@ -816,10 +817,20 @@ public class ItThinClientSqlTest extends 
ItAbstractThinClientTest {
             );
         }
 
-        // No exception expected with correct modifiers.
-        check.accept(ddlStatement, QueryModifier.ALL);
-        check.accept(dmlStatement, QueryModifier.ALL);
-        check.accept(selectStatement, QueryModifier.ALL);
+        // Incorrect modifier for multi-statement.
+        {
+            IgniteTestUtils.assertThrows(
+                    SqlException.class,
+                    () -> check.accept(multiStatement, 
QueryModifier.SINGLE_STMT_MODIFIERS),
+                    "Multiple statements are not allowed."
+            );
+        }
+
+        // No exception expected with correct query modifier.
+        check.accept(ddlStatement, QueryModifier.SINGLE_STMT_MODIFIERS);
+        check.accept(dmlStatement, QueryModifier.SINGLE_STMT_MODIFIERS);
+        check.accept(selectStatement, QueryModifier.SINGLE_STMT_MODIFIERS);
+        check.accept(multiStatement, QueryModifier.ALL);
     }
 
     private static class Pojo {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
index e9a12b3ffa9..2112fad417c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
@@ -77,6 +77,11 @@ public class AsyncResultSetImpl<T> implements 
AsyncResultSet<T> {
         return cursor.partitionAwarenessMetadata();
     }
 
+    /** Returns query cursor. */
+    public AsyncSqlCursor<InternalSqlRow> cursor() {
+        return cursor;
+    }
+
     /** {@inheritDoc} */
     @Override
     public boolean hasRowSet() {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SqlExceptionMapperProvider.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SqlExceptionMapperProvider.java
index 1508984a20f..c47ef5d4419 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SqlExceptionMapperProvider.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SqlExceptionMapperProvider.java
@@ -31,6 +31,7 @@ import 
org.apache.ignite.internal.catalog.CatalogValidationException;
 import org.apache.ignite.internal.lang.IgniteExceptionMapper;
 import org.apache.ignite.internal.lang.IgniteExceptionMappersProvider;
 import org.apache.ignite.internal.sql.engine.QueryCancelledException;
+import 
org.apache.ignite.internal.sql.engine.TxControlInsideExternalTxNotSupportedException;
 import 
org.apache.ignite.internal.sql.engine.exec.RemoteFragmentExecutionException;
 import org.apache.ignite.lang.ErrorGroups.Common;
 import org.apache.ignite.lang.IgniteException;
@@ -76,6 +77,9 @@ public class SqlExceptionMapperProvider implements 
IgniteExceptionMappersProvide
         mappers.add(unchecked(InternalCompilerException.class,
                 err -> new SqlException(Common.INTERNAL_ERR, "Expression 
compiler error. " + err.getMessage(), err)));
 
+        
mappers.add(unchecked(TxControlInsideExternalTxNotSupportedException.class,
+                err -> new SqlException(err.code(), err.getMessage(), err)));
+
         return mappers;
     }
 }

Reply via email to