This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ec70aea639 IGNITE-23430: Sql. Provide an ability to cancel query 
before first page ready (#4615)
ec70aea639 is described below

commit ec70aea639c184dd883d9fc09d050514764d93fe
Author: Max Zhuravkov <shh...@gmail.com>
AuthorDate: Mon Nov 11 08:22:46 2024 +0200

    IGNITE-23430: Sql. Provide an ability to cancel query before first page 
ready (#4615)
---
 .../java/org/apache/ignite/lang/CancelHandle.java  |  62 +++++
 .../org/apache/ignite/lang/CancelHandleImpl.java   | 163 ++++++++++++
 .../org/apache/ignite/lang/CancellationToken.java} |  13 +-
 .../main/java/org/apache/ignite/sql/IgniteSql.java | 189 ++++++++++++-
 .../client/handler/JdbcQueryEventHandlerImpl.java  |   2 +
 .../requests/sql/ClientSqlExecuteRequest.java      |   9 +-
 .../handler/JdbcQueryEventHandlerImplTest.java     |   4 +-
 .../ignite/internal/client/sql/ClientSql.java      |  41 ++-
 .../client/fakes/FakeIgniteQueryProcessor.java     |  10 +-
 .../apache/ignite/internal/util/Cancellable.java   |   1 +
 .../org/apache/ignite/lang/CancelHandleHelper.java |  71 +++++
 .../ignite/lang/CancelHandleHelperSelfTest.java    | 292 +++++++++++++++++++++
 .../benchmark/AbstractMultiNodeBenchmark.java      |   4 +-
 .../ignite/internal/benchmark/SelectBenchmark.java |   4 +-
 .../benchmark/SqlMultiStatementBenchmark.java      |   4 +-
 .../internal/restart/RestartProofIgniteSql.java    |  76 +++++-
 .../internal/sql/api/ItSqlAsynchronousApiTest.java |  98 +++++++
 .../sql/api/ItSqlClientAsynchronousApiTest.java    |  13 +
 .../sql/api/ItSqlClientSynchronousApiTest.java     |  13 +
 .../internal/sql/api/ItSqlSynchronousApiTest.java  |  88 +++++++
 .../sql/engine/BaseSqlMultiStatementTest.java      |   2 +-
 .../internal/sql/engine/ItQueryCancelTest.java     | 255 ++++++++++++++++++
 .../ignite/internal/sql/api/IgniteSqlImpl.java     |  65 +++--
 .../sql/api/PublicApiThreadingIgniteSql.java       |  37 ++-
 .../ignite/internal/sql/engine/QueryCancel.java    |   4 +-
 .../ignite/internal/sql/engine/QueryProcessor.java |  15 +-
 .../internal/sql/engine/SqlQueryProcessor.java     |  10 +-
 .../sql/engine/exec/ExecutionServiceImpl.java      |  25 +-
 .../ignite/internal/sql/engine/exec/fsm/Query.java |   4 +-
 .../sql/engine/exec/fsm/QueryExecutor.java         |  22 +-
 .../sql/engine/prepare/PrepareServiceImpl.java     |   2 -
 .../ignite/internal/sql/api/IgniteSqlImplTest.java |  20 +-
 .../sql/engine/exec/TransactionEnlistTest.java     |  10 +-
 .../sql/engine/framework/TestBuilders.java         |  15 +-
 .../internal/sql/engine/util/QueryCheckerTest.java |  10 +-
 .../internal/sql/engine/util/QueryCheckerImpl.java |  10 +-
 36 files changed, 1546 insertions(+), 117 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/lang/CancelHandle.java 
b/modules/api/src/main/java/org/apache/ignite/lang/CancelHandle.java
new file mode 100644
index 0000000000..1f81df251b
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/lang/CancelHandle.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A handle which may be used to request the cancellation of execution.
+ */
+public interface CancelHandle {
+
+    /** A factory method to create a handle. */
+    static CancelHandle create() {
+        return new CancelHandleImpl();
+    }
+
+    /**
+     * Abruptly terminates an execution of an associated process.
+     *
+     * <p>Control flow will return after the process has been terminated and 
the resources associated with that process have been freed.
+     */
+    void cancel();
+
+    /**
+     * Abruptly terminates an execution of a associated process.
+     *
+     * @return A future that will be completed after the process has been 
terminated and the resources associated with that process have
+     *         been freed.
+     */
+    CompletableFuture<Void> cancelAsync();
+
+    /**
+     * Flag indicating whether cancellation was requested or not.
+     *
+     * <p>This method will return true even if cancellation has not been 
completed yet.
+     *
+     * @return {@code true} when cancellation was requested.
+     */
+    boolean isCancelled();
+
+    /**
+     * Issue a token associated with this handle.
+     *
+     * <p>Token is reusable, meaning the same token may be used to link 
several executions into a single cancellable.
+     */
+    CancellationToken token();
+}
diff --git 
a/modules/api/src/main/java/org/apache/ignite/lang/CancelHandleImpl.java 
b/modules/api/src/main/java/org/apache/ignite/lang/CancelHandleImpl.java
new file mode 100644
index 0000000000..a28c97e3d0
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/lang/CancelHandleImpl.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import java.util.ArrayDeque;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.lang.ErrorGroups.Common;
+
+/** Implementation of {@link CancelHandle}. */
+final class CancelHandleImpl implements CancelHandle {
+
+    private final CompletableFuture<Void> cancelFut = new 
CompletableFuture<>();
+
+    private final CancellationTokenImpl token;
+
+    CancelHandleImpl() {
+        this.token = new CancellationTokenImpl(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void cancel() {
+        doCancelAsync();
+
+        cancelFut.join();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> cancelAsync() {
+        doCancelAsync();
+
+        // Make a copy of internal future, so that it is not possible to 
complete it
+        return cancelFut.copy();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isCancelled() {
+        return token.isCancelled();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CancellationToken token() {
+        return token;
+    }
+
+    private void doCancelAsync() {
+        token.cancel();
+    }
+
+    static final class CancellationTokenImpl implements CancellationToken {
+
+        private final ArrayDeque<Cancellation> cancellations = new 
ArrayDeque<>();
+
+        private final CancelHandleImpl handle;
+
+        private final Object mux = new Object();
+
+        private volatile CompletableFuture<Void> cancelFut;
+
+        CancellationTokenImpl(CancelHandleImpl handle) {
+            this.handle = handle;
+        }
+
+        void addCancelAction(Runnable action, CompletableFuture<?> fut) {
+            Cancellation cancellation = new Cancellation(action, fut);
+
+            if (cancelFut != null) {
+                cancellation.run();
+            } else {
+                synchronized (mux) {
+                    if (cancelFut == null) {
+                        cancellations.add(cancellation);
+                        return;
+                    }
+                }
+
+                cancellation.run();
+            }
+        }
+
+        boolean isCancelled() {
+            return cancelFut != null;
+        }
+
+        @SuppressWarnings("rawtypes")
+        void cancel() {
+            if (cancelFut != null) {
+                return;
+            }
+
+            synchronized (mux) {
+                if (cancelFut != null) {
+                    return;
+                }
+
+                // First assemble all completion futures
+                CompletableFuture[] futures = cancellations.stream()
+                        .map(c -> c.completionFut)
+                        .toArray(CompletableFuture[]::new);
+
+                // handle.cancelFut completes when all cancellation futures 
complete.
+                cancelFut = CompletableFuture.allOf(futures).whenComplete((r, 
t) -> {
+                    handle.cancelFut.complete(null);
+                });
+            }
+
+            IgniteException error = null;
+
+            // Run cancellation actions outside of lock
+            for (Cancellation cancellation : cancellations) {
+                try {
+                    cancellation.run();
+                } catch (Throwable t) {
+                    if (error == null) {
+                        error = new IgniteException(Common.INTERNAL_ERR, 
"Failed to cancel an operation");
+                    }
+                    error.addSuppressed(t);
+                }
+            }
+
+            if (error != null) {
+                throw error;
+            }
+        }
+    }
+
+    /**
+     * Stores an action that triggers a cancellation and a completable future 
that completes when a resource is closed.
+     */
+    private static class Cancellation {
+
+        private final Runnable cancelAction;
+
+        private final CompletableFuture<?> completionFut;
+
+        private Cancellation(Runnable cancelAction, CompletableFuture<?> 
completionFut) {
+            this.cancelAction = cancelAction;
+            this.completionFut = completionFut;
+        }
+
+        private void run() {
+            cancelAction.run();
+        }
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/Cancellable.java 
b/modules/api/src/main/java/org/apache/ignite/lang/CancellationToken.java
similarity index 68%
copy from 
modules/core/src/main/java/org/apache/ignite/internal/util/Cancellable.java
copy to modules/api/src/main/java/org/apache/ignite/lang/CancellationToken.java
index 5f6cd721e1..52857cd317 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/Cancellable.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/CancellationToken.java
@@ -15,16 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.util;
+package org.apache.ignite.lang;
 
 /**
- * A {@code Cancellable} represents a process or an operation that can be 
canceled.
+ * Cancellation token is an object that is issued by {@link CancelHandle} and 
can be used by an operation or a resource to observe a signal
+ * to terminate it.
  */
-public interface Cancellable {
-    /**
-     * Cancels the ongoing operation or process or do nothing if it has been 
already cancelled.
-     *
-     * @param timeout If process was cancelled due to timeout.
-     */
-    void cancel(boolean timeout);
+public interface CancellationToken {
 }
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/IgniteSql.java 
b/modules/api/src/main/java/org/apache/ignite/sql/IgniteSql.java
index d2beea304d..fecd4bfca1 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/IgniteSql.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/IgniteSql.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.sql;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.sql.async.AsyncResultSet;
 import org.apache.ignite.table.mapper.Mapper;
 import org.apache.ignite.tx.Transaction;
@@ -52,7 +53,26 @@ public interface IgniteSql {
      * @return SQL query result set.
      * @throws SqlException If failed.
      */
-    ResultSet<SqlRow> execute(@Nullable Transaction transaction, String query, 
@Nullable Object... arguments);
+    default ResultSet<SqlRow> execute(@Nullable Transaction transaction, 
String query, @Nullable Object... arguments) {
+        return execute(transaction, (CancellationToken) null, query, 
arguments);
+    }
+
+    /**
+     * Executes a single SQL query.
+     *
+     * @param transaction Transaction to execute the query within or {@code 
null}.
+     * @param cancellationToken Cancellation token or {@code null}.
+     * @param query SQL query template.
+     * @param arguments Arguments for the template (optional).
+     * @return SQL query result set.
+     * @throws SqlException If failed.
+     */
+    ResultSet<SqlRow> execute(
+            @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
+            String query,
+            @Nullable Object... arguments
+    );
 
     /**
      * Executes a single SQL statement.
@@ -62,12 +82,54 @@ public interface IgniteSql {
      * @param arguments Arguments for the statement.
      * @return SQL query result set.
      */
-    ResultSet<SqlRow> execute(@Nullable Transaction transaction, Statement 
statement, @Nullable Object... arguments);
+    default ResultSet<SqlRow> execute(
+            @Nullable Transaction transaction,
+            Statement statement,
+            @Nullable Object... arguments
+    ) {
+        return execute(transaction, (CancellationToken) null, statement, 
arguments);
+    }
+
+    /**
+     * Executes a single SQL statement.
+     *
+     * @param transaction Transaction to execute the statement within or 
{@code null}.
+     * @param cancellationToken Cancellation token or {@code null}.
+     * @param statement SQL statement to execute.
+     * @param arguments Arguments for the statement.
+     * @return SQL query result set.
+     */
+    ResultSet<SqlRow> execute(
+            @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
+            Statement statement,
+            @Nullable Object... arguments
+    );
+
+    /**
+     * Executes single SQL statement and maps results to objects with the 
provided mapper.
+     *
+     * @param transaction Transaction to execute the statement within or 
{@code null}.
+     * @param mapper Mapper that defines the row type and the way to map 
columns to the type members. See {@link Mapper#of}.
+     * @param query SQL query template.
+     * @param arguments Arguments for the statement.
+     * @param <T> A type of object contained in result set.
+     * @return SQL query results set.
+     */
+    default <T> ResultSet<T> execute(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            String query,
+            @Nullable Object... arguments
+    ) {
+        return execute(transaction, mapper, null, query, arguments);
+    }
 
     /**
      * Executes single SQL statement and maps results to objects with the 
provided mapper.
      *
      * @param transaction Transaction to execute the statement within or 
{@code null}.
+     * @param cancellationToken Cancellation token or {@code null}.
      * @param mapper Mapper that defines the row type and the way to map 
columns to the type members. See {@link Mapper#of}.
      * @param query SQL query template.
      * @param arguments Arguments for the statement.
@@ -77,8 +139,10 @@ public interface IgniteSql {
     <T> ResultSet<T> execute(
             @Nullable Transaction transaction,
             @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
             String query,
-            @Nullable Object... arguments);
+            @Nullable Object... arguments
+    );
 
     /**
      * Executes single SQL statement and maps results to objects with the 
provided mapper.
@@ -90,27 +154,128 @@ public interface IgniteSql {
      * @param <T> A type of object contained in result set.
      * @return SQL query results set.
      */
+    default <T> ResultSet<T> execute(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            Statement statement,
+            @Nullable Object... arguments
+    ) {
+        return execute(transaction, mapper, null, statement, arguments);
+    }
+
+    /**
+     * Executes single SQL statement and maps results to objects with the 
provided mapper.
+     *
+     * @param transaction Transaction to execute the statement within or 
{@code null}.
+     * @param cancellationToken Cancellation token or {@code null}.
+     * @param mapper Mapper that defines the row type and the way to map 
columns to the type members. See {@link Mapper#of}.
+     * @param statement SQL statement to execute.
+     * @param arguments Arguments for the statement.
+     * @param <T> A type of object contained in result set.
+     * @return SQL query results set.
+     */
     <T> ResultSet<T> execute(
             @Nullable Transaction transaction,
             @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
             Statement statement,
-            @Nullable Object... arguments);
+            @Nullable Object... arguments
+    );
+
+    /**
+     * Executes SQL query in an asynchronous way.
+     *
+     * @param transaction Transaction to execute the query within or {@code 
null}.
+     * @param query SQL query template.
+     * @param arguments Arguments for the template (optional).
+     * @return Operation future.
+     * @throws SqlException If failed.
+     */
+    default CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+            @Nullable Transaction transaction,
+            String query,
+            @Nullable Object... arguments
+    ) {
+        return executeAsync(transaction, (CancellationToken) null, query, 
arguments);
+    }
 
     /**
      * Executes SQL query in an asynchronous way.
      *
      * @param transaction Transaction to execute the query within or {@code 
null}.
+     * @param cancellationToken Cancellation token or {@code null}.
      * @param query SQL query template.
      * @param arguments Arguments for the template (optional).
      * @return Operation future.
      * @throws SqlException If failed.
      */
-    CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(@Nullable 
Transaction transaction, String query, @Nullable Object... arguments);
+    CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+            @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
+            String query,
+            @Nullable Object... arguments
+    );
+
+    /**
+     * Executes an SQL statement asynchronously.
+     *
+     * @param transaction Transaction to execute the statement within or 
{@code null}.
+     * @param statement SQL statement to execute.
+     * @param arguments Arguments for the statement.
+     * @return Operation future.
+     * @throws SqlException If failed.
+     */
+    default CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+            @Nullable Transaction transaction,
+            Statement statement,
+            @Nullable Object... arguments
+    ) {
+        return executeAsync(transaction, (CancellationToken) null, statement, 
arguments);
+    }
+
+    /**
+     * Executes SQL statement in an asynchronous way and maps results to 
objects with the provided mapper.
+     *
+     * @param transaction Transaction to execute the statement within or 
{@code null}.
+     * @param mapper Mapper that defines the row type and the way to map 
columns to the type members. See {@link Mapper#of}.
+     * @param query SQL query template.
+     * @param arguments Arguments for the statement.
+     * @param <T> A type of object contained in result set.
+     * @return Operation future.
+     */
+    default <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            String query,
+            @Nullable Object... arguments
+    ) {
+        return executeAsync(transaction, mapper, null, query, arguments);
+    }
+
+    /**
+     * Executes SQL statement in an asynchronous way and maps results to 
objects with the provided mapper.
+     *
+     * @param transaction Transaction to execute the statement within or 
{@code null}.
+     * @param mapper Mapper that defines the row type and the way to map 
columns to the type members. See {@link Mapper#of}.
+     * @param statement SQL statement to execute.
+     * @param arguments Arguments for the statement.
+     * @param <T> A type of object contained in result set.
+     * @return Operation future.
+     */
+    default <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            Statement statement,
+            @Nullable Object... arguments
+    ) {
+        return executeAsync(transaction, mapper, null, statement, arguments);
+    }
 
     /**
      * Executes an SQL statement asynchronously.
      *
      * @param transaction Transaction to execute the statement within or 
{@code null}.
+     * @param cancellationToken Cancellation token or {@code null}.
      * @param statement SQL statement to execute.
      * @param arguments Arguments for the statement.
      * @return Operation future.
@@ -118,13 +283,16 @@ public interface IgniteSql {
      */
     CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
             @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
             Statement statement,
-            @Nullable Object... arguments);
+            @Nullable Object... arguments
+    );
 
     /**
      * Executes SQL statement in an asynchronous way and maps results to 
objects with the provided mapper.
      *
      * @param transaction Transaction to execute the statement within or 
{@code null}.
+     * @param cancellationToken Cancellation token or {@code null}.
      * @param mapper Mapper that defines the row type and the way to map 
columns to the type members. See {@link Mapper#of}.
      * @param query SQL query template.
      * @param arguments Arguments for the statement.
@@ -134,13 +302,16 @@ public interface IgniteSql {
     <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
             @Nullable Transaction transaction,
             @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
             String query,
-            @Nullable Object... arguments);
+            @Nullable Object... arguments
+    );
 
     /**
      * Executes SQL statement in an asynchronous way and maps results to 
objects with the provided mapper.
      *
      * @param transaction Transaction to execute the statement within or 
{@code null}.
+     * @param cancellationToken Cancellation token or {@code null}.
      * @param mapper Mapper that defines the row type and the way to map 
columns to the type members. See {@link Mapper#of}.
      * @param statement SQL statement to execute.
      * @param arguments Arguments for the statement.
@@ -150,8 +321,10 @@ public interface IgniteSql {
     <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
             @Nullable Transaction transaction,
             @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
             Statement statement,
-            @Nullable Object... arguments);
+            @Nullable Object... arguments
+    );
 
     /**
      * Executes a batched SQL query. Only DML queries are supported.
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
index a73bad6915..75a005a7c7 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
@@ -154,6 +154,7 @@ public class JdbcQueryEventHandlerImpl extends 
JdbcHandlerBase implements JdbcQu
                 properties,
                 igniteTransactions.observableTimestampTracker(),
                 tx,
+                null,
                 req.sqlQuery(),
                 req.arguments() == null ? OBJECT_EMPTY_ARRAY : req.arguments()
         );
@@ -280,6 +281,7 @@ public class JdbcQueryEventHandlerImpl extends 
JdbcHandlerBase implements JdbcQu
                 properties,
                 igniteTransactions.observableTimestampTracker(),
                 tx,
+                null,
                 sql,
                 arg == null ? OBJECT_EMPTY_ARRAY : arg
         );
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index c3fa0795fe..1cae217b90 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -167,8 +167,13 @@ public class ClientSqlExecuteRequest {
                     .build();
 
             CompletableFuture<AsyncResultSet<SqlRow>> fut = qryProc.queryAsync(
-                            properties, 
transactions.observableTimestampTracker(), (InternalTransaction) transaction, 
query, arguments)
-                    .thenCompose(cur -> cur.requestNextAsync(pageSize)
+                            properties,
+                            transactions.observableTimestampTracker(),
+                            (InternalTransaction) transaction,
+                            null,
+                            query,
+                            arguments
+                    ).thenCompose(cur -> cur.requestNextAsync(pageSize)
                             .thenApply(
                                     batchRes -> new AsyncResultSetImpl<>(
                                             cur,
diff --git 
a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java
 
b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java
index d26a857dee..7ece7f2ccb 100644
--- 
a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java
+++ 
b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java
@@ -162,7 +162,7 @@ class JdbcQueryEventHandlerImplTest extends 
BaseIgniteAbstractTest {
 
     @Test
     public void singleTxUsedForMultipleOperations() {
-        when(queryProcessor.queryAsync(any(), any(), any(), any(), 
any(Object[].class)))
+        when(queryProcessor.queryAsync(any(), any(), any(), any(), any(), 
any(Object[].class)))
                 .thenReturn(CompletableFuture.failedFuture(new 
RuntimeException("Expected")));
 
         InternalTransaction tx = mock(InternalTransaction.class);
@@ -196,7 +196,7 @@ class JdbcQueryEventHandlerImplTest extends 
BaseIgniteAbstractTest {
 
         verify(igniteTransactions, times(5)).observableTimestampTracker();
         verifyNoMoreInteractions(igniteTransactions);
-        verify(queryProcessor, times(5)).queryAsync(any(), any(), any(), 
any(), any(Object[].class));
+        verify(queryProcessor, times(5)).queryAsync(any(), any(), any(), 
any(), any(), any(Object[].class));
     }
 
     private long acquireConnectionId() {
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
index 437b945c4d..60e5175a13 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.sql.StatementBuilderImpl;
 import org.apache.ignite.internal.sql.StatementImpl;
 import org.apache.ignite.internal.sql.SyncResultSetAdapter;
 import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.sql.BatchedArguments;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.sql.ResultSet;
@@ -91,11 +92,16 @@ public class ClientSql implements IgniteSql {
 
     /** {@inheritDoc} */
     @Override
-    public ResultSet<SqlRow> execute(@Nullable Transaction transaction, String 
query, @Nullable Object... arguments) {
+    public ResultSet<SqlRow> execute(
+            @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
+            String query,
+            @Nullable Object... arguments
+    ) {
         Objects.requireNonNull(query);
 
         try {
-            return new SyncResultSetAdapter<>(executeAsync(transaction, query, 
arguments).join());
+            return new SyncResultSetAdapter<>(executeAsync(transaction, 
cancellationToken, query, arguments).join());
         } catch (CompletionException e) {
             throw 
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
         }
@@ -103,11 +109,16 @@ public class ClientSql implements IgniteSql {
 
     /** {@inheritDoc} */
     @Override
-    public ResultSet<SqlRow> execute(@Nullable Transaction transaction, 
Statement statement, @Nullable Object... arguments) {
+    public ResultSet<SqlRow> execute(
+            @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
+            Statement statement,
+            @Nullable Object... arguments
+    ) {
         Objects.requireNonNull(statement);
 
         try {
-            return new SyncResultSetAdapter<>(executeAsync(transaction, 
statement, arguments).join());
+            return new SyncResultSetAdapter<>(executeAsync(transaction, 
cancellationToken, statement, arguments).join());
         } catch (CompletionException e) {
             throw 
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
         }
@@ -118,12 +129,14 @@ public class ClientSql implements IgniteSql {
     public <T> ResultSet<T> execute(
             @Nullable Transaction transaction,
             @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
             String query,
-            @Nullable Object... arguments) {
+            @Nullable Object... arguments
+    ) {
         Objects.requireNonNull(query);
 
         try {
-            return new SyncResultSetAdapter<>(executeAsync(transaction, 
mapper, query, arguments).join());
+            return new SyncResultSetAdapter<>(executeAsync(transaction, 
mapper, cancellationToken, query, arguments).join());
         } catch (CompletionException e) {
             throw 
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
         }
@@ -134,12 +147,14 @@ public class ClientSql implements IgniteSql {
     public <T> ResultSet<T> execute(
             @Nullable Transaction transaction,
             @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
             Statement statement,
-            @Nullable Object... arguments) {
+            @Nullable Object... arguments
+    ) {
         Objects.requireNonNull(statement);
 
         try {
-            return new SyncResultSetAdapter<>(executeAsync(transaction, 
mapper, statement, arguments).join());
+            return new SyncResultSetAdapter<>(executeAsync(transaction, 
mapper, cancellationToken, statement, arguments).join());
         } catch (CompletionException e) {
             throw 
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
         }
@@ -177,22 +192,24 @@ public class ClientSql implements IgniteSql {
     @Override
     public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
             @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
             String query,
             @Nullable Object... arguments) {
         Objects.requireNonNull(query);
 
         StatementImpl statement = new StatementImpl(query);
 
-        return executeAsync(transaction, statement, arguments);
+        return executeAsync(transaction, cancellationToken, statement, 
arguments);
     }
 
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
             @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
             Statement statement,
             @Nullable Object... arguments) {
-        return executeAsync(transaction, sqlRowMapper, statement, arguments);
+        return executeAsync(transaction, sqlRowMapper, cancellationToken, 
statement, arguments);
     }
 
     /** {@inheritDoc} */
@@ -200,13 +217,14 @@ public class ClientSql implements IgniteSql {
     public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
             @Nullable Transaction transaction,
             @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
             String query,
             @Nullable Object... arguments) {
         Objects.requireNonNull(query);
 
         StatementImpl statement = new StatementImpl(query);
 
-        return executeAsync(transaction, mapper, statement, arguments);
+        return executeAsync(transaction, mapper, cancellationToken, statement, 
arguments);
     }
 
     /** {@inheritDoc} */
@@ -214,6 +232,7 @@ public class ClientSql implements IgniteSql {
     public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
             @Nullable Transaction transaction,
             @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
             Statement statement,
             @Nullable Object... arguments) {
         Objects.requireNonNull(statement);
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
index 6163a589bb..6c5eef1132 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
@@ -32,6 +32,7 @@ import 
org.apache.ignite.internal.sql.engine.property.SqlProperties;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.sql.SqlException;
 import org.jetbrains.annotations.Nullable;
 
@@ -44,8 +45,12 @@ public class FakeIgniteQueryProcessor implements 
QueryProcessor {
     String lastScript;
 
     @Override
-    public CompletableFuture<QueryMetadata> prepareSingleAsync(SqlProperties 
properties,
-            @Nullable InternalTransaction transaction, String qry, Object... 
params) {
+    public CompletableFuture<QueryMetadata> prepareSingleAsync(
+            SqlProperties properties,
+            @Nullable InternalTransaction transaction,
+            String qry,
+            Object... params
+    ) {
         throw new UnsupportedOperationException();
     }
 
@@ -54,6 +59,7 @@ public class FakeIgniteQueryProcessor implements 
QueryProcessor {
             SqlProperties properties,
             HybridTimestampTracker observableTimeTracker,
             @Nullable InternalTransaction transaction,
+            @Nullable CancellationToken cancellationToken,
             String qry,
             Object... params
     ) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/Cancellable.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/Cancellable.java
index 5f6cd721e1..f5d59fc717 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/Cancellable.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/Cancellable.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.util;
 /**
  * A {@code Cancellable} represents a process or an operation that can be 
canceled.
  */
+@FunctionalInterface
 public interface Cancellable {
     /**
      * Cancels the ongoing operation or process or do nothing if it has been 
already cancelled.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/lang/CancelHandleHelper.java 
b/modules/core/src/main/java/org/apache/ignite/lang/CancelHandleHelper.java
new file mode 100644
index 0000000000..933f0338e8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/lang/CancelHandleHelper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.lang.CancelHandleImpl.CancellationTokenImpl;
+
+/**
+ * Utility class to provide direct access to internals of {@link 
CancelHandleImpl}.
+ */
+public final class CancelHandleHelper {
+
+    private CancelHandleHelper() {
+
+    }
+
+    /**
+     * Attaches a cancellable operation to the given token. A cancellation 
procedure started its handle completes
+     * when {@code completionFut} completes.
+     *
+     * <p>NOTE: If a handle, this token is associated with, was cancelled or 
its cancellation was requested,
+     * this method immediately invokes {@code cancelAction.run()} and it this 
case
+     * <b>it never waits for {@code completionFut} to complete</b>.
+     *
+     * <p>The following methods request cancellation of a handle:
+     * <ul>
+     *     <li>{@link CancelHandle#cancel()}</li>
+     *     <li>{@link CancelHandle#cancelAsync()}</li>
+     * </ul>
+     *
+     * @param token Cancellation token.
+     * @param cancelAction Action that terminates an operation.
+     * @param completionFut Future that completes when operation completes and 
all resources it created are released.
+     */
+    public static void addCancelAction(
+            CancellationToken token,
+            Runnable cancelAction,
+            CompletableFuture<?> completionFut
+    ) {
+        Objects.requireNonNull(token, "token");
+        Objects.requireNonNull(cancelAction, "cancelAction");
+        Objects.requireNonNull(completionFut, "completionFut");
+
+        CancellationTokenImpl t = unwrapToken(token);
+        t.addCancelAction(cancelAction, completionFut);
+    }
+
+    private static CancellationTokenImpl unwrapToken(CancellationToken token) {
+        if (token instanceof CancellationTokenImpl) {
+            return (CancellationTokenImpl) token;
+        } else {
+            throw new IllegalArgumentException("Unexpected CancellationToken: 
" + token.getClass());
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/lang/CancelHandleHelperSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/lang/CancelHandleHelperSelfTest.java
new file mode 100644
index 0000000000..c2b8ddb43e
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/lang/CancelHandleHelperSelfTest.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.lang.ErrorGroups.Common;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+/**
+ * Tests for {@link CancelHandleHelper}.
+ */
+public class CancelHandleHelperSelfTest extends BaseIgniteAbstractTest {
+
+    @Test
+    public void testCancelSync() throws InterruptedException {
+        CancelHandle cancelHandle = CancelHandle.create();
+        CancellationToken token = cancelHandle.token();
+
+        // Initially is not cancelled
+        assertFalse(cancelHandle.isCancelled());
+
+        CountDownLatch operationLatch = new CountDownLatch(1);
+        CompletableFuture<Void> cancelFut = new CompletableFuture<>();
+
+        Runnable cancelAction = () -> {
+            try {
+                operationLatch.await();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            cancelFut.complete(null);
+        };
+
+        CancelHandleHelper.addCancelAction(token, cancelAction, cancelFut);
+
+        CountDownLatch cancelHandleLatch = new CountDownLatch(1);
+
+        // Call cancel in another thread.
+        Thread thread = new Thread(() -> {
+            cancelHandle.cancel();
+            cancelHandleLatch.countDown();
+        });
+        thread.start();
+
+        // Make it possible for cancelAction to complete, because cancelHandle 
calls it in its thread.
+        operationLatch.countDown();
+
+        // Wait until sync cancel returns.
+        cancelHandleLatch.await();
+
+        // Cancellation has completed
+        assertTrue(cancelHandle.cancelAsync().isDone());
+        assertTrue(cancelHandle.isCancelled());
+        assertTrue(cancelHandle.cancelAsync().isDone());
+
+        // Should have no affect
+        cancelHandle.cancel();
+    }
+
+    @Test
+    public void testCancelAsync() {
+        CancelHandle cancelHandle = CancelHandle.create();
+        CancellationToken token = cancelHandle.token();
+
+        // Initially is not cancelled
+        assertFalse(cancelHandle.isCancelled());
+        CountDownLatch operationLatch = new CountDownLatch(1);
+        CompletableFuture<Void> cancelFut = new CompletableFuture<>();
+
+        Runnable cancelAction = () -> {
+            // Run in another thread to avoid blocking.
+            Thread thread = new Thread(() -> {
+                try {
+                    operationLatch.await();
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+                cancelFut.complete(null);
+            });
+            thread.start();
+        };
+
+        CancelHandleHelper.addCancelAction(token, cancelAction, cancelFut);
+
+        // Request cancellation and keep the future, to call it later.
+        CompletableFuture<Void> cancelHandleFut = cancelHandle.cancelAsync();
+        assertTrue(cancelHandle.isCancelled());
+
+        assertFalse(cancelHandleFut.isDone());
+        operationLatch.countDown();
+
+        // Await for cancellation to complete
+        cancelHandleFut.join();
+
+        assertTrue(cancelHandle.isCancelled());
+        assertTrue(cancelHandle.cancelAsync().isDone());
+    }
+
+    @Test
+    public void testCancelAsyncReturnsCopy() {
+        CancelHandle cancelHandle = CancelHandle.create();
+
+        CompletableFuture<Void> f1 = cancelHandle.cancelAsync();
+        CompletableFuture<Void> f2 = cancelHandle.cancelAsync();
+        assertNotSame(f1, f2);
+    }
+
+    @Test
+    public void testRunCancelActionImmediatelyIfCancelSyncCalled() {
+        CancelHandle cancelHandle = CancelHandle.create();
+        CancellationToken token = cancelHandle.token();
+
+        cancelHandle.cancel();
+        assertTrue(cancelHandle.isCancelled());
+
+        Runnable action = Mockito.mock(Runnable.class);
+        CompletableFuture<Void> f = new CompletableFuture<>();
+
+        // Attach it to some operation hasn't completed yet
+        CancelHandleHelper.addCancelAction(token, action, f);
+        verify(action, times(1)).run();
+
+        cancelHandle.cancelAsync().join();
+        // We do not wait for cancellation to complete because
+        // operation has not started yet.
+        assertFalse(f.isDone());
+
+        // Action runs immediately
+        CancelHandleHelper.addCancelAction(token, action, f);
+        verify(action, times(2)).run();
+    }
+
+    @Test
+    public void testRunCancelActionImmediatelyIfCancelAsyncCalled() {
+        CancelHandle cancelHandle = CancelHandle.create();
+        CancellationToken token = cancelHandle.token();
+
+        cancelHandle.cancelAsync();
+        assertTrue(cancelHandle.isCancelled());
+
+        Runnable action = Mockito.mock(Runnable.class);
+        CompletableFuture<Void> f = new CompletableFuture<>();
+
+        // Attach it to some operation hasn't completed yet
+        CancelHandleHelper.addCancelAction(token, action, f);
+        verify(action, times(1)).run();
+
+        cancelHandle.cancelAsync().join();
+        // We do not wait for cancellation to complete because
+        // operation has not started yet.
+        assertFalse(f.isDone());
+
+        // Action runs immediately
+        CancelHandleHelper.addCancelAction(token, action, f);
+        verify(action, times(2)).run();
+    }
+
+    @Test
+    public void testArgumentsMustNotBeNull() {
+        CancelHandle cancelHandle = CancelHandle.create();
+        CancellationToken token = cancelHandle.token();
+        Runnable action = Mockito.mock(Runnable.class);
+        CompletableFuture<Void> f = CompletableFuture.completedFuture(null);
+
+        {
+            NullPointerException err = assertThrows(
+                    NullPointerException.class,
+                    () -> CancelHandleHelper.addCancelAction(null, action, f)
+            );
+            assertEquals("token", err.getMessage());
+        }
+
+        {
+            NullPointerException err = assertThrows(
+                    NullPointerException.class,
+                    () -> CancelHandleHelper.addCancelAction(token, null, f)
+            );
+            assertEquals("cancelAction", err.getMessage());
+        }
+
+        {
+            NullPointerException err = assertThrows(
+                    NullPointerException.class,
+                    () -> CancelHandleHelper.addCancelAction(token, action, 
null)
+            );
+            assertEquals("completionFut", err.getMessage());
+        }
+    }
+
+    @Test
+    public void testMultipleOperations() {
+        class Operation {
+            private final CountDownLatch latch = new CountDownLatch(1);
+            private final CompletableFuture<Void> cancelFut = new 
CompletableFuture<>();
+            private final Runnable cancelAction = () -> {
+                // Run in another thread to avoid blocking.
+                Thread thread = new Thread(() -> {
+                    try {
+                        latch.await();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                    cancelFut.complete(null);
+                });
+                thread.start();
+            };
+        }
+
+        CancelHandle cancelHandle = CancelHandle.create();
+        CancellationToken token = cancelHandle.token();
+
+        Operation operation1 = new Operation();
+        Operation operation2 = new Operation();
+
+        CancelHandleHelper.addCancelAction(token, operation1.cancelAction, 
operation1.cancelFut);
+        CancelHandleHelper.addCancelAction(token, operation2.cancelAction, 
operation2.cancelFut);
+
+        cancelHandle.cancelAsync();
+        assertFalse(operation1.cancelFut.isDone());
+
+        // Cancel the first operation
+        operation1.latch.countDown();
+        operation1.cancelFut.join();
+
+        // The cancelHandle is still not done
+        assertFalse(cancelHandle.cancelAsync().isDone());
+
+        // Cancel the second operation
+        operation2.latch.countDown();
+        operation2.cancelFut.join();
+
+        cancelHandle.cancelAsync().join();
+        assertTrue(cancelHandle.cancelAsync().isDone());
+    }
+
+    @Test
+    public void testExceptionsInCancelActionsAreWrapped() {
+        CancelHandle cancelHandle = CancelHandle.create();
+        CancellationToken token = cancelHandle.token();
+
+        RuntimeException e1 = new RuntimeException("e1");
+        Runnable r1 = () -> {
+            throw e1;
+        };
+        CompletableFuture<Object> f1 = new CompletableFuture<>();
+
+        RuntimeException e2 = new RuntimeException("e2");
+        Runnable r2 = () -> {
+            throw e2;
+        };
+        CompletableFuture<Object> f2 = new CompletableFuture<>();
+
+        CancelHandleHelper.addCancelAction(token, r1, f1);
+        CancelHandleHelper.addCancelAction(token, r2, f2);
+
+        f1.complete(null);
+        f2.complete(null);
+
+        IgniteException err = assertThrows(IgniteException.class, 
cancelHandle::cancel);
+
+        assertEquals("Failed to cancel an operation", err.getMessage());
+        assertEquals(Common.INTERNAL_ERR, err.code(), err.toString());
+        assertEquals(Arrays.asList(e1, e2), 
Arrays.asList(err.getSuppressed()));
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
index 16117bf584..bde9e1b730 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
@@ -90,7 +90,7 @@ public class AbstractMultiNodeBenchmark {
 
             getAllFromCursor(
                     await(queryEngine.queryAsync(
-                            SqlPropertiesHelper.emptyProperties(), 
igniteImpl.observableTimeTracker(), null, createZoneStatement
+                            SqlPropertiesHelper.emptyProperties(), 
igniteImpl.observableTimeTracker(), null, null, createZoneStatement
                     ))
             );
 
@@ -136,7 +136,7 @@ public class AbstractMultiNodeBenchmark {
 
         getAllFromCursor(
                 await(igniteImpl.queryEngine().queryAsync(
-                        SqlPropertiesHelper.emptyProperties(), 
igniteImpl.observableTimeTracker(), null, createTableStatement
+                        SqlPropertiesHelper.emptyProperties(), 
igniteImpl.observableTimeTracker(), null, null, createTableStatement
                 ))
         );
     }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
index a33198c32c..52de6ec493 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
@@ -236,11 +236,11 @@ public class SelectBenchmark extends 
AbstractMultiNodeBenchmark {
         }
 
         private Iterator<InternalSqlRow> query(String sql, Object... args) {
-            return handleFirstBatch(queryProc.queryAsync(properties, 
igniteImpl.observableTimeTracker(), null, sql, args));
+            return handleFirstBatch(queryProc.queryAsync(properties, 
igniteImpl.observableTimeTracker(), null, null, sql, args));
         }
 
         private Iterator<InternalSqlRow> script(String sql, Object... args) {
-            return handleFirstBatch(queryProc.queryAsync(scriptProperties, 
igniteImpl.observableTimeTracker(), null, sql, args));
+            return handleFirstBatch(queryProc.queryAsync(scriptProperties, 
igniteImpl.observableTimeTracker(), null, null, sql, args));
         }
 
         private Iterator<InternalSqlRow> 
handleFirstBatch(CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFut) {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlMultiStatementBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlMultiStatementBenchmark.java
index fe8059283a..2ccb23aa71 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlMultiStatementBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlMultiStatementBenchmark.java
@@ -380,14 +380,14 @@ public class SqlMultiStatementBenchmark extends 
AbstractMultiNodeBenchmark {
 
         Iterator<InternalSqlRow> execQuery(String sql, Object ... args) {
             AsyncSqlCursor<InternalSqlRow> cursor =
-                    queryProcessor.queryAsync(props, observableTimeTracker, 
null, sql, args).join();
+                    queryProcessor.queryAsync(props, observableTimeTracker, 
null, null, sql, args).join();
 
             return new InternalResultsIterator(cursor, pageSize);
         }
 
         Iterator<InternalSqlRow> execScript(String sql, Object ... args) {
             AsyncSqlCursor<InternalSqlRow> cursor =
-                    queryProcessor.queryAsync(scriptProps, 
observableTimeTracker, null, sql, args).join();
+                    queryProcessor.queryAsync(scriptProps, 
observableTimeTracker, null, null, sql, args).join();
 
             return new InternalResultsIterator(cursor, pageSize);
         }
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteSql.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteSql.java
index e120ebfa19..f56e4cc5b6 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteSql.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteSql.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.wrapper.Wrapper;
 import org.apache.ignite.internal.wrapper.Wrappers;
+import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.sql.BatchedArguments;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.sql.ResultSet;
@@ -58,71 +59,126 @@ class RestartProofIgniteSql implements IgniteSql, Wrapper {
     }
 
     @Override
-    public ResultSet<SqlRow> execute(@Nullable Transaction transaction, String 
query, @Nullable Object... arguments) {
-        return attachmentLock.attached(ignite -> 
ignite.sql().execute(transaction, query, arguments));
+    public ResultSet<SqlRow> execute(
+            @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
+            String query,
+            @Nullable Object... arguments
+    ) {
+        return attachmentLock.attached(ignite -> 
ignite.sql().execute(transaction, cancellationToken, query, arguments));
     }
 
     @Override
-    public ResultSet<SqlRow> execute(@Nullable Transaction transaction, 
Statement statement, @Nullable Object... arguments) {
-        return attachmentLock.attached(ignite -> 
ignite.sql().execute(transaction, statement, arguments));
+    public ResultSet<SqlRow> execute(
+            @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
+            Statement statement,
+            @Nullable Object... arguments
+    ) {
+        return attachmentLock.attached(ignite -> ignite.sql().execute(
+                transaction,
+                cancellationToken,
+                statement,
+                arguments)
+        );
     }
 
     @Override
     public <T> ResultSet<T> execute(
             @Nullable Transaction transaction,
             @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
             String query,
             @Nullable Object... arguments
     ) {
-        return attachmentLock.attached(ignite -> 
ignite.sql().execute(transaction, mapper, query, arguments));
+        return attachmentLock.attached(ignite -> ignite.sql().execute(
+                transaction,
+                mapper,
+                cancellationToken,
+                query,
+                arguments)
+        );
     }
 
     @Override
     public <T> ResultSet<T> execute(
             @Nullable Transaction transaction,
             @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
             Statement statement,
             @Nullable Object... arguments
     ) {
-        return attachmentLock.attached(ignite -> 
ignite.sql().execute(transaction, mapper, statement, arguments));
+        return attachmentLock.attached(ignite -> ignite.sql().execute(
+                transaction,
+                mapper,
+                cancellationToken,
+                statement,
+                arguments)
+        );
     }
 
     @Override
     public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
             @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
             String query,
             @Nullable Object... arguments
     ) {
-        return attachmentLock.attachedAsync(ignite -> 
ignite.sql().executeAsync(transaction, query, arguments));
+        return attachmentLock.attachedAsync(ignite -> 
ignite.sql().executeAsync(
+                transaction,
+                cancellationToken,
+                query,
+                arguments)
+        );
     }
 
     @Override
     public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
             @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
             Statement statement,
             @Nullable Object... arguments
     ) {
-        return attachmentLock.attachedAsync(ignite -> 
ignite.sql().executeAsync(transaction, statement, arguments));
+        return attachmentLock.attachedAsync(ignite -> 
ignite.sql().executeAsync(
+                transaction,
+                cancellationToken,
+                statement,
+                arguments)
+        );
     }
 
     @Override
     public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
             @Nullable Transaction transaction,
             @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
             String query,
             @Nullable Object... arguments
     ) {
-        return attachmentLock.attachedAsync(ignite -> 
ignite.sql().executeAsync(transaction, mapper, query, arguments));
+        return attachmentLock.attachedAsync(ignite -> 
ignite.sql().executeAsync(
+                transaction,
+                mapper,
+                cancellationToken,
+                query,
+                arguments)
+        );
     }
 
     @Override
     public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
             @Nullable Transaction transaction,
             @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
             Statement statement,
             @Nullable Object... arguments
     ) {
-        return attachmentLock.attachedAsync(ignite -> 
ignite.sql().executeAsync(transaction, mapper, statement, arguments));
+        return attachmentLock.attachedAsync(ignite -> 
ignite.sql().executeAsync(
+                transaction,
+                mapper,
+                cancellationToken,
+                statement,
+                arguments)
+        );
     }
 
     @Override
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index 9446c7ae8d..a888854552 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -20,28 +20,37 @@ package org.apache.ignite.internal.sql.api;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 import org.apache.ignite.internal.sql.SyncResultSetAdapter;
 import org.apache.ignite.internal.wrapper.Wrappers;
+import org.apache.ignite.lang.CancelHandle;
+import org.apache.ignite.lang.CancellationToken;
+import org.apache.ignite.lang.ErrorGroups.Sql;
 import org.apache.ignite.sql.BatchedArguments;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlException;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.sql.Statement;
 import org.apache.ignite.sql.async.AsyncResultSet;
 import org.apache.ignite.tx.Transaction;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
 
 /**
  * Tests for asynchronous SQL API.
@@ -95,6 +104,95 @@ public class ItSqlAsynchronousApiTest extends 
ItSqlApiBaseTest {
         }
     }
 
+    @Test
+    public void cancelQueryString() throws InterruptedException {
+        IgniteSql sql = igniteSql();
+        String query = "SELECT * FROM system_range(0, 10000000000)";
+
+        // no transaction
+        executeAndCancel((token) -> {
+            return sql.executeAsync(null, token, query);
+        });
+
+        // with transaction
+        executeAndCancel((token) -> {
+            Transaction transaction = igniteTx().begin();
+
+            return sql.executeAsync(transaction, token, query);
+        });
+    }
+
+    @Test
+    public void cancelStatement() throws InterruptedException {
+        IgniteSql sql = igniteSql();
+
+        String query = "SELECT * FROM system_range(0, 10000000000)";
+
+        // no transaction
+        executeAndCancel((token) -> {
+            Statement statement = sql.statementBuilder()
+                    .query(query)
+                    .build();
+
+            return sql.executeAsync(null, token, statement);
+        });
+
+        // with transaction
+        executeAndCancel((token) -> {
+            Statement statement = sql.statementBuilder()
+                    .query(query)
+                    .build();
+
+            Transaction transaction = igniteTx().begin();
+
+            return sql.executeAsync(transaction, token, statement);
+        });
+    }
+
+    private static void executeAndCancel(
+            Function<CancellationToken, 
CompletableFuture<AsyncResultSet<SqlRow>>> execute
+    ) throws InterruptedException {
+
+        CancelHandle cancelHandle = CancelHandle.create();
+        CountDownLatch firstResultSetLatch = new CountDownLatch(1);
+
+        CompletableFuture<AsyncResultSet<SqlRow>> resultSetFut = 
execute.apply(cancelHandle.token());
+
+        // Wait for the first result set to become available and then cancel 
the query
+        resultSetFut.whenComplete((r, t) -> firstResultSetLatch.countDown());
+        firstResultSetLatch.await();
+
+        cancelHandle.cancelAsync();
+
+        CompletionException err = assertThrows(CompletionException.class, new 
DrainResultSet(resultSetFut));
+        SqlException sqlErr = assertInstanceOf(SqlException.class, 
err.getCause());
+        assertEquals(Sql.EXECUTION_CANCELLED_ERR, sqlErr.code());
+
+        cancelHandle.cancelAsync().join();
+    }
+
+    private static class DrainResultSet implements Executable {
+        CompletableFuture<? extends AsyncResultSet<SqlRow>> rs;
+
+        DrainResultSet(CompletableFuture<? extends AsyncResultSet<SqlRow>> rs) 
{
+            this.rs = rs;
+        }
+
+        @Override
+        public void execute() {
+            AsyncResultSet<SqlRow> current;
+            do {
+                current = rs.join();
+                if (current.hasRowSet() && current.hasMorePages()) {
+                    rs = current.fetchNextPage();
+                } else {
+                    return;
+                }
+
+            } while (current.hasMorePages());
+        }
+    }
+
     @Override
     protected ResultSet<SqlRow> executeForRead(IgniteSql sql, Transaction tx, 
Statement statement, Object... args) {
         return new SyncResultSetAdapter(await(sql.executeAsync(tx, statement, 
args)));
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
index 22564c4121..8a16f708f1 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
@@ -23,6 +23,7 @@ import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.tx.IgniteTransactions;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
 
 /**
  * Tests for asynchronous client SQL API.
@@ -40,6 +41,18 @@ public class ItSqlClientAsynchronousApiTest extends 
ItSqlAsynchronousApiTest {
         client.close();
     }
 
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-23646";)
+    @Override
+    public void cancelQueryString() throws InterruptedException {
+        super.cancelQueryString();
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-23646";)
+    @Override
+    public void cancelStatement() throws InterruptedException {
+        super.cancelStatement();
+    }
+
     @Override
     protected IgniteSql igniteSql() {
         return client.sql();
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
index 283fb792e6..cdcba98bb2 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
@@ -23,6 +23,7 @@ import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.tx.IgniteTransactions;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
 
 /**
  * Tests for synchronous client SQL API.
@@ -40,6 +41,18 @@ public class ItSqlClientSynchronousApiTest extends 
ItSqlSynchronousApiTest {
         client.close();
     }
 
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-23646";)
+    @Override
+    public void cancelQueryString() throws InterruptedException {
+        super.cancelQueryString();
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-23646";)
+    @Override
+    public void cancelStatement() throws InterruptedException {
+        super.cancelStatement();
+    }
+
     @Override
     protected IgniteSql igniteSql() {
         return client.sql();
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
index 329f82745e..f7cb46f1d9 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
@@ -19,20 +19,108 @@ package org.apache.ignite.internal.sql.api;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Function;
+import org.apache.ignite.lang.CancelHandle;
+import org.apache.ignite.lang.CancellationToken;
+import org.apache.ignite.lang.ErrorGroups.Sql;
 import org.apache.ignite.sql.BatchedArguments;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlException;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.sql.Statement;
 import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
 
 /**
  * Tests for synchronous SQL API.
  */
 public class ItSqlSynchronousApiTest extends ItSqlApiBaseTest {
+
+    private final List<Transaction> transactions = new ArrayList<>();
+
+    @AfterEach
+    public void rollbackTransactions() {
+        transactions.forEach(Transaction::rollback);
+    }
+
+    @Test
+    public void cancelQueryString() throws InterruptedException {
+        IgniteSql sql = igniteSql();
+        String query = "SELECT * FROM system_range(0, 10000000000)";
+
+        // no transaction
+        executeAndCancel((token) -> {
+            Statement statement = sql.statementBuilder()
+                    .query(query)
+                    .build();
+
+            return sql.execute(null, token, statement);
+        });
+
+        // with transaction
+        executeAndCancel((token) -> {
+            Statement statement = sql.statementBuilder()
+                    .query(query)
+                    .build();
+
+            Transaction transaction = igniteTx().begin();
+            return sql.execute(transaction, token, statement);
+        });
+    }
+
+    @Test
+    public void cancelStatement() throws InterruptedException {
+        IgniteSql sql = igniteSql();
+        String query = "SELECT * FROM system_range(0, 10000000000)";
+
+        // no transaction
+        executeAndCancel((token) -> {
+            return sql.execute(null, token, query);
+        });
+
+        // with transaction
+        executeAndCancel((token) -> {
+            Transaction transaction = igniteTx().begin();
+            return sql.execute(transaction, token, query);
+        });
+    }
+
+    private static void executeAndCancel(Function<CancellationToken, 
ResultSet<SqlRow>> execute) throws InterruptedException {
+        CancelHandle cancelHandle = CancelHandle.create();
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        // Run statement in another thread
+        CompletableFuture<Void> f = CompletableFuture.runAsync(() -> {
+            try (ResultSet<SqlRow> resultSet = 
execute.apply(cancelHandle.token())) {
+                // Start a query and cancel it later.
+                latch.countDown();
+                while (resultSet.hasNext()) {
+                    resultSet.next();
+                }
+            }
+        });
+
+        latch.await();
+        cancelHandle.cancelAsync();
+
+        CompletionException err = assertThrows(CompletionException.class, 
f::join);
+        SqlException sqlErr = assertInstanceOf(SqlException.class, 
err.getCause());
+        assertEquals(Sql.EXECUTION_CANCELLED_ERR, sqlErr.code());
+
+        cancelHandle.cancelAsync().join();
+    }
+
     @Override
     protected ResultSet<SqlRow> executeForRead(IgniteSql sql, Transaction tx, 
Statement statement, Object... args) {
         return sql.execute(tx, statement, args);
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/BaseSqlMultiStatementTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/BaseSqlMultiStatementTest.java
index 23ed7fc3d8..9f40678fd1 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/BaseSqlMultiStatementTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/BaseSqlMultiStatementTest.java
@@ -82,7 +82,7 @@ public abstract class BaseSqlMultiStatementTest extends 
BaseSqlIntegrationTest {
                 .build();
 
         AsyncSqlCursor<InternalSqlRow> cursor = await(
-                queryProcessor().queryAsync(properties, 
observableTimeTracker(), tx, query, params)
+                queryProcessor().queryAsync(properties, 
observableTimeTracker(), tx, null, query, params)
         );
 
         return Objects.requireNonNull(cursor);
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItQueryCancelTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItQueryCancelTest.java
new file mode 100644
index 0000000000..6c47f523e5
--- /dev/null
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItQueryCancelTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static 
org.apache.ignite.internal.sql.engine.QueryProperty.ALLOWED_QUERY_TYPES;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import org.apache.ignite.internal.TestWrappers;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.apache.ignite.internal.sql.engine.property.SqlProperties;
+import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper;
+import org.apache.ignite.internal.tx.HybridTimestampTracker;
+import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
+import org.apache.ignite.lang.CancelHandle;
+import org.apache.ignite.lang.CancellationToken;
+import org.apache.ignite.lang.ErrorGroups.Sql;
+import org.apache.ignite.sql.SqlException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/** Set of test cases for query cancellation. */
+public class ItQueryCancelTest extends BaseSqlIntegrationTest {
+
+    /** Calling {@link CancelHandle#cancel()} should cancel execution of a 
single query. */
+    @Test
+    public void testCancelSingleQuery()  {
+        IgniteImpl igniteImpl = TestWrappers.unwrapIgniteImpl(CLUSTER.node(0));
+
+        SqlProperties properties = SqlPropertiesHelper.newBuilder()
+                .set(ALLOWED_QUERY_TYPES, Set.of(SqlQueryType.QUERY))
+                .build();
+
+        HybridTimestampTracker hybridTimestampTracker = 
igniteImpl.observableTimeTracker();
+
+        SqlQueryProcessor qryProc = queryProcessor();
+
+        StringBuilder query = new StringBuilder();
+
+        query.append("SELECT v FROM (VALUES ");
+        for (int i = 0; i < 100; i++) {
+            if (i > 0) {
+                query.append(", ");
+            }
+            query.append("(");
+            query.append(i);
+            query.append(")");
+        }
+        query.append(" ) t(v)");
+
+        CancelHandle cancelHandle = CancelHandle.create();
+        CancellationToken token = cancelHandle.token();
+
+        AsyncSqlCursor<InternalSqlRow> query1 = qryProc.queryAsync(
+                properties,
+                hybridTimestampTracker,
+                null,
+                token,
+                query.toString()
+        ).join();
+
+        assertEquals(1, qryProc.runningQueries());
+
+        // Request cancellation.
+        CompletableFuture<Void> cancelled = cancelHandle.cancelAsync();
+
+        // Obverse cancellation error
+        expectQueryCancelled(new DrainCursor(query1));
+
+        cancelled.join();
+
+        assertEquals(0, qryProc.runningQueries());
+    }
+
+    /** Calling {@link CancelHandle#cancel()} should cancel execution of 
multiple queries. */
+    @Test
+    public void testCancelMultipleQueries()  {
+        IgniteImpl igniteImpl = TestWrappers.unwrapIgniteImpl(CLUSTER.node(0));
+
+        SqlProperties properties = SqlPropertiesHelper.newBuilder()
+                .set(ALLOWED_QUERY_TYPES, Set.of(SqlQueryType.QUERY))
+                .build();
+
+        HybridTimestampTracker hybridTimestampTracker = 
igniteImpl.observableTimeTracker();
+
+        SqlQueryProcessor qryProc = queryProcessor();
+
+        StringBuilder query = new StringBuilder();
+
+        query.append("SELECT v FROM (VALUES ");
+        for (int i = 0; i < 100; i++) {
+            if (i > 0) {
+                query.append(", ");
+            }
+            query.append("(");
+            query.append(i);
+            query.append(")");
+        }
+        query.append(" ) t(v)");
+
+        CancelHandle cancelHandle = CancelHandle.create();
+        CancellationToken token = cancelHandle.token();
+
+        AsyncSqlCursor<InternalSqlRow> query1 = qryProc.queryAsync(
+                properties,
+                hybridTimestampTracker,
+                null,
+                token,
+                query.toString()
+        ).join();
+
+        AsyncSqlCursor<InternalSqlRow> query2 = qryProc.queryAsync(
+                properties,
+                hybridTimestampTracker,
+                null,
+                token,
+                query.toString()
+        ).join();
+
+        assertEquals(2, qryProc.runningQueries());
+
+        // Request cancellation.
+        CompletableFuture<Void> cancelled = cancelHandle.cancelAsync();
+
+        // Obverse cancellation errors
+        expectQueryCancelled(new DrainCursor(query1));
+        expectQueryCancelled(new DrainCursor(query2));
+
+        cancelled.join();
+
+        assertEquals(0, qryProc.runningQueries());
+    }
+
+    /** Starting a query with a cancelled token should trigger query 
cancellation. */
+    @Test
+    public void testQueryWontStartWhenHandleIsCancelled() {
+        IgniteImpl igniteImpl = TestWrappers.unwrapIgniteImpl(CLUSTER.node(0));
+
+        SqlProperties properties = SqlPropertiesHelper.newBuilder()
+                .set(ALLOWED_QUERY_TYPES, Set.of(SqlQueryType.QUERY))
+                .build();
+
+        HybridTimestampTracker hybridTimestampTracker = 
igniteImpl.observableTimeTracker();
+
+        SqlQueryProcessor qryProc = queryProcessor();
+
+        CancelHandle cancelHandle = CancelHandle.create();
+        CancellationToken token = cancelHandle.token();
+
+        cancelHandle.cancel();
+
+        Runnable run = () -> qryProc.queryAsync(
+                properties,
+                hybridTimestampTracker,
+                null,
+                token,
+                "SELECT 1"
+        ).join();
+
+        expectQueryCancelledWithQueryCancelledException(run);
+
+        assertEquals(0, qryProc.runningQueries());
+    }
+
+    /** Calling {@link CancelHandle#cancel()} should cancel execution of 
queries that use executable plans. */
+    @ParameterizedTest
+    @ValueSource(strings = {
+            "SELECT * FROM t WHERE id = 1",
+            "INSERT INTO t VALUES (1, 1)",
+            "SELECT COUNT(*) FROM t",
+    })
+    public void testExecutablePlans(String query) {
+        sql("CREATE TABLE IF NOT EXISTS t (id INT PRIMARY KEY, val INT)");
+
+        IgniteImpl igniteImpl = TestWrappers.unwrapIgniteImpl(CLUSTER.node(0));
+
+        SqlProperties properties = SqlPropertiesHelper.newBuilder()
+                .set(ALLOWED_QUERY_TYPES, Set.of(SqlQueryType.QUERY, 
SqlQueryType.DML))
+                .build();
+
+        HybridTimestampTracker hybridTimestampTracker = 
igniteImpl.observableTimeTracker();
+
+        SqlQueryProcessor qryProc = queryProcessor();
+
+        CancelHandle cancelHandle = CancelHandle.create();
+        CancellationToken token = cancelHandle.token();
+
+        Runnable run = () -> qryProc.queryAsync(
+                properties,
+                hybridTimestampTracker,
+                null,
+                token,
+                query
+        ).join();
+
+        // Request cancellation
+        CompletableFuture<Void> f = cancelHandle.cancelAsync();
+
+        // Obverse cancellation error
+        expectQueryCancelledWithQueryCancelledException(run);
+
+        f.join();
+
+        assertEquals(0, qryProc.runningQueries());
+    }
+
+    private static void expectQueryCancelled(Runnable action) {
+        CompletionException err = assertThrows(CompletionException.class, 
action::run);
+        SqlException sqlErr = assertInstanceOf(SqlException.class, 
err.getCause());
+        assertEquals(Sql.EXECUTION_CANCELLED_ERR, sqlErr.code(), 
sqlErr.toString());
+    }
+
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-23637 remove this 
method and use expectQueryCancelled instead.
+    private static void 
expectQueryCancelledWithQueryCancelledException(Runnable action) {
+        CompletionException err = assertThrows(CompletionException.class, 
action::run);
+        assertInstanceOf(QueryCancelledException.class, err.getCause());
+    }
+
+    private static class DrainCursor implements Runnable {
+        final AsyncSqlCursor<?> cursor;
+
+        DrainCursor(AsyncSqlCursor<?> cursor) {
+            this.cursor = cursor;
+        }
+
+        @Override
+        public void run() {
+            BatchedResult<?> batch;
+            do {
+                batch = cursor.requestNextAsync(1).join();
+            } while (!batch.items().isEmpty());
+        }
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
index 5a2da1d991..fec0ee6785 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.AsyncCursor;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.lang.TraceableException;
 import org.apache.ignite.sql.BatchedArguments;
 import org.apache.ignite.sql.IgniteSql;
@@ -201,11 +202,16 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent {
 
     /** {@inheritDoc} */
     @Override
-    public ResultSet<SqlRow> execute(@Nullable Transaction transaction, String 
query, @Nullable Object... arguments) {
+    public ResultSet<SqlRow> execute(
+            @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
+            String query,
+            @Nullable Object... arguments
+    ) {
         Objects.requireNonNull(query);
 
         try {
-            return new SyncResultSetAdapter<>(executeAsync(transaction, query, 
arguments).join());
+            return new SyncResultSetAdapter<>(executeAsync(transaction, 
cancellationToken, query, arguments).join());
         } catch (CompletionException e) {
             throw 
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
         }
@@ -213,11 +219,16 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent {
 
     /** {@inheritDoc} */
     @Override
-    public ResultSet<SqlRow> execute(@Nullable Transaction transaction, 
Statement statement, @Nullable Object... arguments) {
+    public ResultSet<SqlRow> execute(
+            @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
+            Statement statement,
+            @Nullable Object... arguments
+    ) {
         Objects.requireNonNull(statement);
 
         try {
-            return new SyncResultSetAdapter<>(executeAsync(transaction, 
statement, arguments).join());
+            return new SyncResultSetAdapter<>(executeAsync(transaction, 
cancellationToken, statement, arguments).join());
         } catch (CompletionException e) {
             throw 
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
         }
@@ -225,15 +236,16 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent {
 
     /** {@inheritDoc} */
     @Override
-    public  <T> ResultSet<T> execute(
+    public <T> ResultSet<T> execute(
             @Nullable Transaction transaction,
             @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
             String query,
             @Nullable Object... arguments) {
         Objects.requireNonNull(query);
 
         try {
-            return new SyncResultSetAdapter<>(executeAsync(transaction, 
mapper, query, arguments).join());
+            return new SyncResultSetAdapter<>(executeAsync(transaction, 
mapper, cancellationToken, query, arguments).join());
         } catch (CompletionException e) {
             throw 
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
         }
@@ -241,9 +253,10 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent {
 
     /** {@inheritDoc} */
     @Override
-    public  <T> ResultSet<T> execute(
+    public <T> ResultSet<T> execute(
             @Nullable Transaction transaction,
             @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
             Statement statement,
             @Nullable Object... arguments) {
         Objects.requireNonNull(statement);
@@ -291,26 +304,32 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent {
     @Override
     public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
             @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
             String query,
             @Nullable Object... arguments
     ) {
-        return executeAsyncInternal(transaction, createStatement(query), 
arguments);
+        return executeAsyncInternal(transaction, cancellationToken, 
createStatement(query), arguments);
     }
 
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
             @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
             Statement statement,
             @Nullable Object... arguments
     ) {
-        return executeAsyncInternal(transaction, statement, arguments);
+        return executeAsyncInternal(transaction, cancellationToken, statement, 
arguments);
     }
 
     /** {@inheritDoc} */
     @Override
-    public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(@Nullable 
Transaction transaction, @Nullable Mapper<T> mapper,
-            String query, @Nullable Object... arguments) {
+    public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
+            String query, @Nullable Object... arguments
+    ) {
         // TODO: IGNITE-18695.
         throw new UnsupportedOperationException("Not implemented yet.");
     }
@@ -320,14 +339,17 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent {
     public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
             @Nullable Transaction transaction,
             @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
             Statement statement,
-            @Nullable Object... arguments) {
+            @Nullable Object... arguments
+    ) {
         // TODO: IGNITE-18695.
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
     private CompletableFuture<AsyncResultSet<SqlRow>> executeAsyncInternal(
             @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
             Statement statement,
             @Nullable Object... arguments
     ) {
@@ -347,7 +369,12 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent {
                     .build();
 
             result = queryProcessor.queryAsync(
-                    properties, observableTimestampTracker, 
(InternalTransaction) transaction, statement.query(), arguments
+                    properties,
+                    observableTimestampTracker,
+                    (InternalTransaction) transaction,
+                    cancellationToken,
+                    statement.query(),
+                    arguments
             ).thenCompose(cur -> {
                 if (!busyLock.enterBusy()) {
                     cur.closeAsync();
@@ -459,7 +486,7 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent {
                 }
 
                 try {
-                    return queryProcessor.queryAsync(properties0, 
observableTimestampTracker, transaction, query, args)
+                    return queryProcessor.queryAsync(properties0, 
observableTimestampTracker, transaction, null, query, args)
                             .thenCompose(cursor -> {
                                 if (!enterBusy.get()) {
                                     cursor.closeAsync();
@@ -578,8 +605,14 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent {
                 .set(QueryProperty.ALLOWED_QUERY_TYPES, SqlQueryType.ALL)
                 .build());
 
-        CompletableFuture<AsyncSqlCursor<InternalSqlRow>> f =
-                queryProcessor.queryAsync(properties0, 
observableTimestampTracker, null, query, arguments);
+        CompletableFuture<AsyncSqlCursor<InternalSqlRow>> f = 
queryProcessor.queryAsync(
+                properties0,
+                observableTimestampTracker,
+                null,
+                null,
+                query,
+                arguments
+        );
 
         CompletableFuture<Void> resFut = new CompletableFuture<>();
         ScriptHandler handler = new ScriptHandler(resFut, enterBusy, 
leaveBusy);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/PublicApiThreadingIgniteSql.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/PublicApiThreadingIgniteSql.java
index 6d3a33c2b9..24e774c867 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/PublicApiThreadingIgniteSql.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/PublicApiThreadingIgniteSql.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Executor;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.thread.PublicApiThreading;
 import org.apache.ignite.internal.wrapper.Wrapper;
+import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.sql.BatchedArguments;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.sql.ResultSet;
@@ -61,71 +62,87 @@ public class PublicApiThreadingIgniteSql implements 
IgniteSql, Wrapper {
     }
 
     @Override
-    public ResultSet<SqlRow> execute(@Nullable Transaction transaction, String 
query, @Nullable Object... arguments) {
-        return execUserSyncOperation(() -> sql.execute(transaction, query, 
arguments));
+    public ResultSet<SqlRow> execute(
+            @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
+            String query,
+            @Nullable Object... arguments
+    ) {
+        return execUserSyncOperation(() -> sql.execute(transaction, 
cancellationToken, query, arguments));
     }
 
     @Override
-    public ResultSet<SqlRow> execute(@Nullable Transaction transaction, 
Statement statement, @Nullable Object... arguments) {
-        return execUserSyncOperation(() -> sql.execute(transaction, statement, 
arguments));
+    public ResultSet<SqlRow> execute(
+            @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
+            Statement statement,
+            @Nullable Object... arguments
+    ) {
+        return execUserSyncOperation(() -> sql.execute(transaction, 
cancellationToken, statement, arguments));
     }
 
     @Override
     public <T> ResultSet<T> execute(
             @Nullable Transaction transaction,
             @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
             String query,
             @Nullable Object... arguments
     ) {
-        return execUserSyncOperation(() -> sql.execute(transaction, mapper, 
query, arguments));
+        return execUserSyncOperation(() -> sql.execute(transaction, mapper, 
cancellationToken, query, arguments));
     }
 
     @Override
     public <T> ResultSet<T> execute(
             @Nullable Transaction transaction,
             @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
             Statement statement,
             @Nullable Object... arguments
     ) {
-        return execUserSyncOperation(() -> sql.execute(transaction, mapper, 
statement, arguments));
+        return execUserSyncOperation(() -> sql.execute(transaction, mapper, 
cancellationToken, statement, arguments));
     }
 
     @Override
     public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
             @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
             String query,
             @Nullable Object... arguments
     ) {
-        return doAsyncOperationForResultSet(() -> 
sql.executeAsync(transaction, query, arguments));
+        return doAsyncOperationForResultSet(() -> 
sql.executeAsync(transaction, cancellationToken, query, arguments));
     }
 
     @Override
     public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
             @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
             Statement statement,
             @Nullable Object... arguments
     ) {
-        return doAsyncOperationForResultSet(() -> 
sql.executeAsync(transaction, statement, arguments));
+        return doAsyncOperationForResultSet(() -> 
sql.executeAsync(transaction, cancellationToken, statement, arguments));
     }
 
     @Override
     public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
             @Nullable Transaction transaction,
             @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
             String query,
             @Nullable Object... arguments
     ) {
-        return doAsyncOperationForResultSet(() -> 
sql.executeAsync(transaction, mapper, query, arguments));
+        return doAsyncOperationForResultSet(() -> 
sql.executeAsync(transaction, mapper, cancellationToken, query, arguments));
     }
 
     @Override
     public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
             @Nullable Transaction transaction,
             @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
             Statement statement,
             @Nullable Object... arguments
     ) {
-        return doAsyncOperationForResultSet(() -> 
sql.executeAsync(transaction, mapper, statement, arguments));
+        return doAsyncOperationForResultSet(() -> 
sql.executeAsync(transaction, mapper, cancellationToken, statement, arguments));
     }
 
     @Override
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryCancel.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryCancel.java
index 65ce14b2b0..1b1fb5ab66 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryCancel.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryCancel.java
@@ -88,14 +88,14 @@ public class QueryCancel {
     }
 
     /** Returns {@code true} if the cancellation procedure has already been 
started. */
-    public synchronized boolean isCancelled() {
+    public boolean isCancelled() {
         return state.isDone();
     }
 
     private static void throwException(Reason reason) {
         throw new QueryCancelledException(
                 reason == Reason.TIMEOUT
-                        ? QueryCancelledException.TIMEOUT_MSG 
+                        ? QueryCancelledException.TIMEOUT_MSG
                         : QueryCancelledException.CANCEL_MSG
         );
     }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
index 38471488c9..68b30ec977 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
@@ -23,6 +23,7 @@ import 
org.apache.ignite.internal.sql.engine.prepare.QueryMetadata;
 import org.apache.ignite.internal.sql.engine.property.SqlProperties;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.lang.IgniteException;
 import org.jetbrains.annotations.Nullable;
 
@@ -32,21 +33,23 @@ import org.jetbrains.annotations.Nullable;
 public interface QueryProcessor extends IgniteComponent {
 
     /**
-     * Returns columns and parameters metadata for the given statement.
-     * This method uses optional array of parameters to assist with type 
inference.
+     * Returns columns and parameters metadata for the given statement. This 
method uses optional array of parameters to assist with type
+     * inference.
      *
      * @param properties User query properties. See {@link QueryProperty} for 
available properties.
      * @param transaction A transaction to use to resolve a schema.
      * @param qry Single statement SQL query.
      * @param params Query parameters.
      * @return Query metadata.
-     *
      * @throws IgniteException in case of an error.
      * @see QueryProperty
      */
-    CompletableFuture<QueryMetadata> prepareSingleAsync(SqlProperties 
properties,
+    CompletableFuture<QueryMetadata> prepareSingleAsync(
+            SqlProperties properties,
             @Nullable InternalTransaction transaction,
-            String qry, Object... params);
+            String qry,
+            Object... params
+    );
 
     /**
      * Execute the query with given schema name and parameters.
@@ -55,6 +58,7 @@ public interface QueryProcessor extends IgniteComponent {
      * @param observableTime Tracker of the latest time observed by client.
      * @param transaction A transaction to use for query execution. If null, 
an implicit transaction
      *      will be started by provided transactions facade.
+     * @param cancellationToken Cancellation token or {@code null}.
      * @param qry SQL query.
      * @param params Query parameters.
      * @return Sql cursor.
@@ -66,6 +70,7 @@ public interface QueryProcessor extends IgniteComponent {
             SqlProperties properties,
             HybridTimestampTracker observableTime,
             @Nullable InternalTransaction transaction,
+            @Nullable CancellationToken cancellationToken,
             String qry,
             Object... params
     );
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 0c7f9047e8..3174d51c8c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -101,6 +101,7 @@ import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.sql.SqlException;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
@@ -407,6 +408,7 @@ public class SqlQueryProcessor implements QueryProcessor, 
SystemViewProvider {
             SqlProperties properties,
             HybridTimestampTracker observableTimeTracker,
             @Nullable InternalTransaction transaction,
+            @Nullable CancellationToken cancellationToken,
             String qry,
             Object... params
     ) {
@@ -418,7 +420,13 @@ public class SqlQueryProcessor implements QueryProcessor, 
SystemViewProvider {
             QueryTransactionContext txContext = new 
QueryTransactionContextImpl(txManager, observableTimeTracker, transaction,
                     txTracker);
 
-            return queryExecutor.executeQuery(properties, txContext, qry, 
params);
+            return queryExecutor.executeQuery(
+                    properties,
+                    txContext,
+                    qry,
+                    cancellationToken,
+                    params
+            );
         } finally {
             busyLock.leaveBusy();
         }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 229cd17c86..f582767d09 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.IgniteStringBuilder;
+import org.apache.ignite.internal.lang.SqlExceptionMapperUtil;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.TopologyEventHandler;
@@ -304,7 +305,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
 
         assert cancelHandler != null;
 
-        // This call triggers a timeout exception, if operation has timed out.
+        // This call immediately triggers a cancellation exception if 
operation has timed out or it has already been cancelled.
         cancelHandler.add(timeout -> {
             QueryCompletionReason reason = timeout ? 
QueryCompletionReason.TIMEOUT : QueryCompletionReason.CANCEL;
             queryManager.close(reason);
@@ -463,11 +464,9 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
         assert queryCancel != null;
 
         queryCancel.add(timeout -> {
-            if (!timeout) {
-                return;
+            if (timeout) {
+                ret.completeExceptionally(new 
QueryCancelledException(QueryCancelledException.TIMEOUT_MSG));
             }
-
-            ret.completeExceptionally(new 
QueryCancelledException(QueryCancelledException.TIMEOUT_MSG));
         });
 
         return new IteratorToDataCursorAdapter<>(ret, Runnable::run);
@@ -976,6 +975,22 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, TopologyEve
         private AsyncCursor<InternalSqlRow> execute(InternalTransaction tx, 
MultiStepPlan multiStepPlan) {
             assert root != null;
 
+            // Query has already been cancelled, return immediately.
+            if (cancelled.get()) {
+                return new AsyncCursor<>() {
+                    @Override
+                    public CompletableFuture<BatchedResult<InternalSqlRow>> 
requestNextAsync(int rows) {
+                        Throwable t = 
SqlExceptionMapperUtil.mapToPublicSqlException(new QueryCancelledException());
+                        return CompletableFuture.failedFuture(t);
+                    }
+
+                    @Override
+                    public CompletableFuture<Void> closeAsync() {
+                        return DistributedQueryManager.this.cancelFut;
+                    }
+                };
+            }
+
             boolean mapOnBackups = tx.isReadOnly();
             MappingParameters mappingParameters = 
MappingParameters.create(ctx.parameters(), mapOnBackups);
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
index 6154a57958..b7a6414a92 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
@@ -38,7 +38,7 @@ import org.jetbrains.annotations.Nullable;
 
 /**
  * Represents a query initiated on current node.
- * 
+ *
  * <p>Encapsulates intermediate state populated throughout query lifecycle.
  */
 class Query implements Runnable {
@@ -170,7 +170,7 @@ class Query implements Runnable {
         return onPhaseStartedCallback.computeIfAbsent(phase, k -> new 
CompletableFuture<>());
     }
 
-    /** Moves the query to a given state. */ 
+    /** Moves the query to a given state. */
     void moveTo(ExecutionPhase newPhase) {
         synchronized (mux) {
             assert currentPhase.transitionAllowed(newPhase) : "currentPhase=" 
+ currentPhase + ", newPhase=" + newPhase;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
index c955914834..6386895cdb 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
@@ -55,6 +55,8 @@ import 
org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
 import org.apache.ignite.internal.sql.engine.util.cache.Cache;
 import org.apache.ignite.internal.sql.engine.util.cache.CacheFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.CancelHandleHelper;
+import org.apache.ignite.lang.CancellationToken;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -128,6 +130,7 @@ public class QueryExecutor implements LifecycleAware {
      * @param properties User query properties. See {@link QueryProperty} for 
available properties.
      * @param txContext Transactional context to use.
      * @param sql Query string.
+     * @param cancellationToken Cancellation token.
      * @param params Query parameters.
      * @return Future which will be completed with cursor.
      */
@@ -135,7 +138,8 @@ public class QueryExecutor implements LifecycleAware {
             SqlProperties properties,
             QueryTransactionContext txContext,
             String sql,
-            Object... params
+            @Nullable CancellationToken cancellationToken,
+            Object[] params
     ) {
         SqlProperties properties0 = SqlPropertiesHelper.chain(properties, 
defaultProperties);
 
@@ -155,7 +159,7 @@ public class QueryExecutor implements LifecycleAware {
         }
 
         try {
-            trackQuery(query);
+            trackQuery(query, cancellationToken);
         } finally {
             busyLock.leaveBusy();
         }
@@ -199,7 +203,7 @@ public class QueryExecutor implements LifecycleAware {
         }
 
         try {
-            trackQuery(query);
+            trackQuery(query, null);
         } finally {
             busyLock.leaveBusy();
         }
@@ -310,13 +314,19 @@ public class QueryExecutor implements LifecycleAware {
         );
     }
 
-    private void trackQuery(Query query) {
+    private void trackQuery(Query query, @Nullable CancellationToken 
cancellationToken) {
         Query old = runningQueries.put(query.id, query);
 
         assert old == null : "Query with the same id already registered";
 
-        query.onPhaseStarted(ExecutionPhase.TERMINATED)
-                .whenComplete((ignored, ex) -> 
runningQueries.remove(query.id));
+        CompletableFuture<Void> queryTerminationFut = 
query.onPhaseStarted(ExecutionPhase.TERMINATED);
+        CompletableFuture<Void> queryTerminationDoneFut = 
queryTerminationFut.whenComplete((ignored, ex) -> {
+            runningQueries.remove(query.id);
+        });
+
+        if (cancellationToken != null) {
+            CancelHandleHelper.addCancelAction(cancellationToken, 
query.cancel::cancel, queryTerminationDoneFut);
+        }
     }
 
     /** Returns list of queries registered on server at the moment. */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
index 690c23d767..7259223588 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
@@ -231,8 +231,6 @@ public class PrepareServiceImpl implements PrepareService {
             return CompletableFuture.failedFuture(new 
SchemaNotFoundException(schemaName));
         }
 
-        // Add an action to trigger planner timeout, when operation times out.
-        // Or trigger timeout immediately if operation has already timed out.
         QueryCancel cancelHandler = operationContext.cancel();
         assert cancelHandler != null;
         boolean explicitTx = operationContext.txContext() != null && 
operationContext.txContext().explicitTx() != null;
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/IgniteSqlImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/IgniteSqlImplTest.java
index 198bfedff5..3d7dc79a64 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/IgniteSqlImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/IgniteSqlImplTest.java
@@ -98,7 +98,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest {
         when(result.onClose())
                 .thenReturn(closeFuture);
 
-        when(queryProcessor.queryAsync(any(), any(), any(), any(), 
any(Object[].class)))
+        when(queryProcessor.queryAsync(any(), any(), any(), any(), any(), 
any(Object[].class)))
                 .thenReturn(completedFuture(result));
 
         AsyncResultSet<?> rs = await(igniteSql.executeAsync(null, "SELECT 1"));
@@ -121,7 +121,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest {
         when(result.onClose()).thenReturn(new CompletableFuture<>());
         when(result.closeAsync()).thenReturn(nullCompletedFuture());
 
-        when(queryProcessor.queryAsync(any(), any(), any(), any(), 
any(Object[].class)))
+        when(queryProcessor.queryAsync(any(), any(), any(), any(), any(), 
any(Object[].class)))
                 .thenReturn(completedFuture(result));
 
         AsyncResultSet<?> rs = await(igniteSql.executeAsync(null, "SELECT 1"));
@@ -139,7 +139,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest {
     void resultSetIsNotCreatedIfComponentIsStoppedInMiddleOfOperation() throws 
Exception {
         CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture = new 
CompletableFuture<>();
         CountDownLatch executeQueryLatch = new CountDownLatch(1);
-        when(queryProcessor.queryAsync(any(), any(), any(), any(), 
any(Object[].class)))
+        when(queryProcessor.queryAsync(any(), any(), any(), any(), any(), 
any(Object[].class)))
                 .thenAnswer(ignored -> {
                     executeQueryLatch.countDown();
                     return cursorFuture;
@@ -175,7 +175,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest {
 
         CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture = new 
CompletableFuture<>();
         CountDownLatch executeQueryLatch = new CountDownLatch(3);
-        when(queryProcessor.queryAsync(any(), any(), any(), any(), 
any(Object[].class)))
+        when(queryProcessor.queryAsync(any(), any(), any(), any(), any(), 
any(Object[].class)))
                 .thenAnswer(ignored -> {
                     executeQueryLatch.countDown();
 
@@ -206,7 +206,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest {
                 () -> await(result)
         );
         assertThat(igniteSql.openedCursors(), empty());
-        verify(queryProcessor, times(3)).queryAsync(any(), any(), any(), 
any(), any(Object[].class));
+        verify(queryProcessor, times(3)).queryAsync(any(), any(), any(), 
any(), any(), any(Object[].class));
         verify(cursor).closeAsync();
     }
 
@@ -232,7 +232,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest {
         when(result.onClose())
                 .thenReturn(closeFuture);
 
-        when(queryProcessor.queryAsync(any(), any(), any(), any(), 
any(Object[].class)))
+        when(queryProcessor.queryAsync(any(), any(), any(), any(), any(), 
any(Object[].class)))
                 .thenReturn(completedFuture(result));
 
         AsyncResultSet<?> rs = await(igniteSql.executeAsync(null, "SELECT 1"));
@@ -264,7 +264,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest {
         when(cursor2.hasNextResult()).thenReturn(false);
         when(cursor2.closeAsync()).thenReturn(nullCompletedFuture());
 
-        when(queryProcessor.queryAsync(any(), any(), any(), any(), 
any(Object[].class)))
+        when(queryProcessor.queryAsync(any(), any(), any(), any(), any(), 
any(Object[].class)))
                 .thenReturn(completedFuture(cursor1));
 
         Void rs = await(igniteSql.executeScriptAsync("SELECT 1; SELECT 2"));
@@ -281,7 +281,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest {
         
when(cursor1.nextResult()).thenReturn(CompletableFuture.failedFuture(new 
RuntimeException("Broken")));
         when(cursor1.closeAsync()).thenReturn(nullCompletedFuture());
 
-        when(queryProcessor.queryAsync(any(), any(), any(), any(), 
any(Object[].class)))
+        when(queryProcessor.queryAsync(any(), any(), any(), any(), any(), 
any(Object[].class)))
                 .thenReturn(completedFuture(cursor1));
 
         assertThrowsSqlException(
@@ -310,7 +310,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest {
         
when(cursor2.nextResult()).thenReturn(CompletableFuture.failedFuture(lastCursorScriptException));
         
when(cursor2.closeAsync()).thenReturn(CompletableFuture.failedFuture(cursorCloseException2));
 
-        when(queryProcessor.queryAsync(any(), any(), any(), any(), 
any(Object[].class)))
+        when(queryProcessor.queryAsync(any(), any(), any(), any(), any(), 
any(Object[].class)))
                 .thenReturn(completedFuture(cursor1));
 
         SqlException sqlEx = assertThrowsExactly(SqlException.class, () -> 
await(igniteSql.executeScriptAsync("SELECT 1; SELECT 2")));
@@ -336,7 +336,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest {
 
         when(cursor2.closeAsync()).thenReturn(nullCompletedFuture());
 
-        when(queryProcessor.queryAsync(any(), any(), any(), any(), 
any(Object[].class)))
+        when(queryProcessor.queryAsync(any(), any(), any(), any(), any(), 
any(Object[].class)))
                 .thenReturn(completedFuture(cursor1));
 
         Thread thread = new Thread(() -> {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
index 6d213cbee0..c26427b329 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
@@ -45,6 +45,7 @@ import 
org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.lang.CancellationToken;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -120,8 +121,12 @@ public class TransactionEnlistTest extends 
BaseIgniteAbstractTest {
         }
 
         @Override
-        public CompletableFuture<QueryMetadata> 
prepareSingleAsync(SqlProperties properties,
-                @Nullable InternalTransaction transaction, String qry, 
Object... params) {
+        public CompletableFuture<QueryMetadata> prepareSingleAsync(
+                SqlProperties properties,
+                @Nullable InternalTransaction transaction,
+                String qry,
+                Object... params
+        ) {
             assert params == null || params.length == 0 : "params are not 
supported";
             assert prepareOnly : "Expected that the query will be executed";
 
@@ -135,6 +140,7 @@ public class TransactionEnlistTest extends 
BaseIgniteAbstractTest {
                 SqlProperties properties,
                 HybridTimestampTracker observableTimeTracker,
                 @Nullable InternalTransaction transaction,
+                @Nullable CancellationToken cancellationToken,
                 String qry,
                 Object... params
         ) {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 0eb50b9a33..61abdcbd3b 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -88,6 +88,7 @@ import 
org.apache.ignite.internal.partitiondistribution.Assignment;
 import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
 import 
org.apache.ignite.internal.partitiondistribution.TokenizedAssignmentsImpl;
 import org.apache.ignite.internal.sql.SqlCommon;
+import org.apache.ignite.internal.sql.engine.QueryCancel;
 import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
 import org.apache.ignite.internal.sql.engine.exec.ExecutableTable;
 import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
@@ -511,6 +512,9 @@ public class TestBuilders {
         /** Sets the dynamic parameters this fragment will be executed with. */
         ExecutionContextBuilder dynamicParameters(Object... params);
 
+        /** Sets the query cancellation procedure. */
+        ExecutionContextBuilder queryCancel(QueryCancel queryCancel);
+
         /**
          * Builds the context object.
          *
@@ -526,6 +530,7 @@ public class TestBuilders {
         private QueryTaskExecutor executor = null;
         private ClusterNode node = null;
         private Object[] dynamicParams = ArrayUtils.OBJECT_EMPTY_ARRAY;
+        private QueryCancel queryCancel = null;
 
         /** {@inheritDoc} */
         @Override
@@ -559,12 +564,20 @@ public class TestBuilders {
             return this;
         }
 
+        /** {@inheritDoc} */
         @Override
         public ExecutionContextBuilder dynamicParameters(Object... params) {
             this.dynamicParams = params;
             return this;
         }
 
+        /** {@inheritDoc} */
+        @Override
+        public ExecutionContextBuilder queryCancel(QueryCancel queryCancel) {
+            this.queryCancel = queryCancel;
+            return this;
+        }
+
         /** {@inheritDoc} */
         @Override
         public ExecutionContext<Object[]> build() {
@@ -578,7 +591,7 @@ public class TestBuilders {
                     Commons.parametersMap(dynamicParams),
                     TxAttributes.fromTx(new NoOpTransaction(node.name())),
                     SqlQueryProcessor.DEFAULT_TIME_ZONE_ID,
-                    null
+                    queryCancel
             );
         }
     }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
index 7cbdc2ab80..75d4a8ddbd 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
@@ -43,6 +43,7 @@ import 
org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.sql.ColumnMetadata;
 import org.apache.ignite.sql.ColumnType;
 import org.jetbrains.annotations.Nullable;
@@ -304,8 +305,12 @@ public class QueryCheckerTest extends 
BaseIgniteAbstractTest {
         }
 
         @Override
-        public CompletableFuture<QueryMetadata> 
prepareSingleAsync(SqlProperties properties,
-                @Nullable InternalTransaction transaction, String qry, 
Object... params) {
+        public CompletableFuture<QueryMetadata> prepareSingleAsync(
+                SqlProperties properties,
+                @Nullable InternalTransaction transaction,
+                String qry,
+                Object... params
+        ) {
             assert params == null || params.length == 0 : "params are not 
supported";
             assert prepareOnly : "Expected that the query will be executed";
 
@@ -319,6 +324,7 @@ public class QueryCheckerTest extends 
BaseIgniteAbstractTest {
                 SqlProperties properties,
                 HybridTimestampTracker observableTimeTracker,
                 @Nullable InternalTransaction transaction,
+                @Nullable CancellationToken cancellationToken,
                 String qry,
                 Object... params
         ) {
diff --git 
a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerImpl.java
 
b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerImpl.java
index 56dd52f50b..2c7ec723e2 100644
--- 
a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerImpl.java
+++ 
b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerImpl.java
@@ -327,7 +327,13 @@ abstract class QueryCheckerImpl implements QueryChecker {
 
         if (!CollectionUtils.nullOrEmpty(planMatchers)) {
             CompletableFuture<AsyncSqlCursor<InternalSqlRow>> explainCursors = 
qryProc.queryAsync(
-                    properties, observableTimeTracker(), tx, "EXPLAIN PLAN FOR 
" + qry, params);
+                    properties,
+                    observableTimeTracker(),
+                    tx,
+                    null,
+                    "EXPLAIN PLAN FOR " + qry,
+                    params
+            );
             AsyncSqlCursor<InternalSqlRow> explainCursor = 
await(explainCursors);
             List<InternalSqlRow> explainRes = getAllFromCursor(explainCursor);
 
@@ -353,7 +359,7 @@ abstract class QueryCheckerImpl implements QueryChecker {
 
         // Check result.
         CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursors =
-                bypassingThreadAssertionsAsync(() -> 
qryProc.queryAsync(properties, observableTimeTracker(), tx, qry, params));
+                bypassingThreadAssertionsAsync(() -> 
qryProc.queryAsync(properties, observableTimeTracker(), tx, null, qry, params));
 
         AsyncSqlCursor<InternalSqlRow> cur = await(cursors);
 

Reply via email to