This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ec70aea639 IGNITE-23430: Sql. Provide an ability to cancel query
before first page ready (#4615)
ec70aea639 is described below
commit ec70aea639c184dd883d9fc09d050514764d93fe
Author: Max Zhuravkov <[email protected]>
AuthorDate: Mon Nov 11 08:22:46 2024 +0200
IGNITE-23430: Sql. Provide an ability to cancel query before first page
ready (#4615)
---
.../java/org/apache/ignite/lang/CancelHandle.java | 62 +++++
.../org/apache/ignite/lang/CancelHandleImpl.java | 163 ++++++++++++
.../org/apache/ignite/lang/CancellationToken.java} | 13 +-
.../main/java/org/apache/ignite/sql/IgniteSql.java | 189 ++++++++++++-
.../client/handler/JdbcQueryEventHandlerImpl.java | 2 +
.../requests/sql/ClientSqlExecuteRequest.java | 9 +-
.../handler/JdbcQueryEventHandlerImplTest.java | 4 +-
.../ignite/internal/client/sql/ClientSql.java | 41 ++-
.../client/fakes/FakeIgniteQueryProcessor.java | 10 +-
.../apache/ignite/internal/util/Cancellable.java | 1 +
.../org/apache/ignite/lang/CancelHandleHelper.java | 71 +++++
.../ignite/lang/CancelHandleHelperSelfTest.java | 292 +++++++++++++++++++++
.../benchmark/AbstractMultiNodeBenchmark.java | 4 +-
.../ignite/internal/benchmark/SelectBenchmark.java | 4 +-
.../benchmark/SqlMultiStatementBenchmark.java | 4 +-
.../internal/restart/RestartProofIgniteSql.java | 76 +++++-
.../internal/sql/api/ItSqlAsynchronousApiTest.java | 98 +++++++
.../sql/api/ItSqlClientAsynchronousApiTest.java | 13 +
.../sql/api/ItSqlClientSynchronousApiTest.java | 13 +
.../internal/sql/api/ItSqlSynchronousApiTest.java | 88 +++++++
.../sql/engine/BaseSqlMultiStatementTest.java | 2 +-
.../internal/sql/engine/ItQueryCancelTest.java | 255 ++++++++++++++++++
.../ignite/internal/sql/api/IgniteSqlImpl.java | 65 +++--
.../sql/api/PublicApiThreadingIgniteSql.java | 37 ++-
.../ignite/internal/sql/engine/QueryCancel.java | 4 +-
.../ignite/internal/sql/engine/QueryProcessor.java | 15 +-
.../internal/sql/engine/SqlQueryProcessor.java | 10 +-
.../sql/engine/exec/ExecutionServiceImpl.java | 25 +-
.../ignite/internal/sql/engine/exec/fsm/Query.java | 4 +-
.../sql/engine/exec/fsm/QueryExecutor.java | 22 +-
.../sql/engine/prepare/PrepareServiceImpl.java | 2 -
.../ignite/internal/sql/api/IgniteSqlImplTest.java | 20 +-
.../sql/engine/exec/TransactionEnlistTest.java | 10 +-
.../sql/engine/framework/TestBuilders.java | 15 +-
.../internal/sql/engine/util/QueryCheckerTest.java | 10 +-
.../internal/sql/engine/util/QueryCheckerImpl.java | 10 +-
36 files changed, 1546 insertions(+), 117 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/CancelHandle.java
b/modules/api/src/main/java/org/apache/ignite/lang/CancelHandle.java
new file mode 100644
index 0000000000..1f81df251b
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/lang/CancelHandle.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A handle which may be used to request the cancellation of execution.
+ */
+public interface CancelHandle {
+
+ /** A factory method to create a handle. */
+ static CancelHandle create() {
+ return new CancelHandleImpl();
+ }
+
+ /**
+ * Abruptly terminates an execution of an associated process.
+ *
+ * <p>Control flow will return after the process has been terminated and
the resources associated with that process have been freed.
+ */
+ void cancel();
+
+ /**
+ * Abruptly terminates an execution of a associated process.
+ *
+ * @return A future that will be completed after the process has been
terminated and the resources associated with that process have
+ * been freed.
+ */
+ CompletableFuture<Void> cancelAsync();
+
+ /**
+ * Flag indicating whether cancellation was requested or not.
+ *
+ * <p>This method will return true even if cancellation has not been
completed yet.
+ *
+ * @return {@code true} when cancellation was requested.
+ */
+ boolean isCancelled();
+
+ /**
+ * Issue a token associated with this handle.
+ *
+ * <p>Token is reusable, meaning the same token may be used to link
several executions into a single cancellable.
+ */
+ CancellationToken token();
+}
diff --git
a/modules/api/src/main/java/org/apache/ignite/lang/CancelHandleImpl.java
b/modules/api/src/main/java/org/apache/ignite/lang/CancelHandleImpl.java
new file mode 100644
index 0000000000..a28c97e3d0
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/lang/CancelHandleImpl.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import java.util.ArrayDeque;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.lang.ErrorGroups.Common;
+
+/** Implementation of {@link CancelHandle}. */
+final class CancelHandleImpl implements CancelHandle {
+
+ private final CompletableFuture<Void> cancelFut = new
CompletableFuture<>();
+
+ private final CancellationTokenImpl token;
+
+ CancelHandleImpl() {
+ this.token = new CancellationTokenImpl(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void cancel() {
+ doCancelAsync();
+
+ cancelFut.join();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Void> cancelAsync() {
+ doCancelAsync();
+
+ // Make a copy of internal future, so that it is not possible to
complete it
+ return cancelFut.copy();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isCancelled() {
+ return token.isCancelled();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CancellationToken token() {
+ return token;
+ }
+
+ private void doCancelAsync() {
+ token.cancel();
+ }
+
+ static final class CancellationTokenImpl implements CancellationToken {
+
+ private final ArrayDeque<Cancellation> cancellations = new
ArrayDeque<>();
+
+ private final CancelHandleImpl handle;
+
+ private final Object mux = new Object();
+
+ private volatile CompletableFuture<Void> cancelFut;
+
+ CancellationTokenImpl(CancelHandleImpl handle) {
+ this.handle = handle;
+ }
+
+ void addCancelAction(Runnable action, CompletableFuture<?> fut) {
+ Cancellation cancellation = new Cancellation(action, fut);
+
+ if (cancelFut != null) {
+ cancellation.run();
+ } else {
+ synchronized (mux) {
+ if (cancelFut == null) {
+ cancellations.add(cancellation);
+ return;
+ }
+ }
+
+ cancellation.run();
+ }
+ }
+
+ boolean isCancelled() {
+ return cancelFut != null;
+ }
+
+ @SuppressWarnings("rawtypes")
+ void cancel() {
+ if (cancelFut != null) {
+ return;
+ }
+
+ synchronized (mux) {
+ if (cancelFut != null) {
+ return;
+ }
+
+ // First assemble all completion futures
+ CompletableFuture[] futures = cancellations.stream()
+ .map(c -> c.completionFut)
+ .toArray(CompletableFuture[]::new);
+
+ // handle.cancelFut completes when all cancellation futures
complete.
+ cancelFut = CompletableFuture.allOf(futures).whenComplete((r,
t) -> {
+ handle.cancelFut.complete(null);
+ });
+ }
+
+ IgniteException error = null;
+
+ // Run cancellation actions outside of lock
+ for (Cancellation cancellation : cancellations) {
+ try {
+ cancellation.run();
+ } catch (Throwable t) {
+ if (error == null) {
+ error = new IgniteException(Common.INTERNAL_ERR,
"Failed to cancel an operation");
+ }
+ error.addSuppressed(t);
+ }
+ }
+
+ if (error != null) {
+ throw error;
+ }
+ }
+ }
+
+ /**
+ * Stores an action that triggers a cancellation and a completable future
that completes when a resource is closed.
+ */
+ private static class Cancellation {
+
+ private final Runnable cancelAction;
+
+ private final CompletableFuture<?> completionFut;
+
+ private Cancellation(Runnable cancelAction, CompletableFuture<?>
completionFut) {
+ this.cancelAction = cancelAction;
+ this.completionFut = completionFut;
+ }
+
+ private void run() {
+ cancelAction.run();
+ }
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/Cancellable.java
b/modules/api/src/main/java/org/apache/ignite/lang/CancellationToken.java
similarity index 68%
copy from
modules/core/src/main/java/org/apache/ignite/internal/util/Cancellable.java
copy to modules/api/src/main/java/org/apache/ignite/lang/CancellationToken.java
index 5f6cd721e1..52857cd317 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/Cancellable.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/CancellationToken.java
@@ -15,16 +15,11 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.util;
+package org.apache.ignite.lang;
/**
- * A {@code Cancellable} represents a process or an operation that can be
canceled.
+ * Cancellation token is an object that is issued by {@link CancelHandle} and
can be used by an operation or a resource to observe a signal
+ * to terminate it.
*/
-public interface Cancellable {
- /**
- * Cancels the ongoing operation or process or do nothing if it has been
already cancelled.
- *
- * @param timeout If process was cancelled due to timeout.
- */
- void cancel(boolean timeout);
+public interface CancellationToken {
}
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/IgniteSql.java
b/modules/api/src/main/java/org/apache/ignite/sql/IgniteSql.java
index d2beea304d..fecd4bfca1 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/IgniteSql.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/IgniteSql.java
@@ -18,6 +18,7 @@
package org.apache.ignite.sql;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.sql.async.AsyncResultSet;
import org.apache.ignite.table.mapper.Mapper;
import org.apache.ignite.tx.Transaction;
@@ -52,7 +53,26 @@ public interface IgniteSql {
* @return SQL query result set.
* @throws SqlException If failed.
*/
- ResultSet<SqlRow> execute(@Nullable Transaction transaction, String query,
@Nullable Object... arguments);
+ default ResultSet<SqlRow> execute(@Nullable Transaction transaction,
String query, @Nullable Object... arguments) {
+ return execute(transaction, (CancellationToken) null, query,
arguments);
+ }
+
+ /**
+ * Executes a single SQL query.
+ *
+ * @param transaction Transaction to execute the query within or {@code
null}.
+ * @param cancellationToken Cancellation token or {@code null}.
+ * @param query SQL query template.
+ * @param arguments Arguments for the template (optional).
+ * @return SQL query result set.
+ * @throws SqlException If failed.
+ */
+ ResultSet<SqlRow> execute(
+ @Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
+ String query,
+ @Nullable Object... arguments
+ );
/**
* Executes a single SQL statement.
@@ -62,12 +82,54 @@ public interface IgniteSql {
* @param arguments Arguments for the statement.
* @return SQL query result set.
*/
- ResultSet<SqlRow> execute(@Nullable Transaction transaction, Statement
statement, @Nullable Object... arguments);
+ default ResultSet<SqlRow> execute(
+ @Nullable Transaction transaction,
+ Statement statement,
+ @Nullable Object... arguments
+ ) {
+ return execute(transaction, (CancellationToken) null, statement,
arguments);
+ }
+
+ /**
+ * Executes a single SQL statement.
+ *
+ * @param transaction Transaction to execute the statement within or
{@code null}.
+ * @param cancellationToken Cancellation token or {@code null}.
+ * @param statement SQL statement to execute.
+ * @param arguments Arguments for the statement.
+ * @return SQL query result set.
+ */
+ ResultSet<SqlRow> execute(
+ @Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
+ Statement statement,
+ @Nullable Object... arguments
+ );
+
+ /**
+ * Executes single SQL statement and maps results to objects with the
provided mapper.
+ *
+ * @param transaction Transaction to execute the statement within or
{@code null}.
+ * @param mapper Mapper that defines the row type and the way to map
columns to the type members. See {@link Mapper#of}.
+ * @param query SQL query template.
+ * @param arguments Arguments for the statement.
+ * @param <T> A type of object contained in result set.
+ * @return SQL query results set.
+ */
+ default <T> ResultSet<T> execute(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ String query,
+ @Nullable Object... arguments
+ ) {
+ return execute(transaction, mapper, null, query, arguments);
+ }
/**
* Executes single SQL statement and maps results to objects with the
provided mapper.
*
* @param transaction Transaction to execute the statement within or
{@code null}.
+ * @param cancellationToken Cancellation token or {@code null}.
* @param mapper Mapper that defines the row type and the way to map
columns to the type members. See {@link Mapper#of}.
* @param query SQL query template.
* @param arguments Arguments for the statement.
@@ -77,8 +139,10 @@ public interface IgniteSql {
<T> ResultSet<T> execute(
@Nullable Transaction transaction,
@Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
String query,
- @Nullable Object... arguments);
+ @Nullable Object... arguments
+ );
/**
* Executes single SQL statement and maps results to objects with the
provided mapper.
@@ -90,27 +154,128 @@ public interface IgniteSql {
* @param <T> A type of object contained in result set.
* @return SQL query results set.
*/
+ default <T> ResultSet<T> execute(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ Statement statement,
+ @Nullable Object... arguments
+ ) {
+ return execute(transaction, mapper, null, statement, arguments);
+ }
+
+ /**
+ * Executes single SQL statement and maps results to objects with the
provided mapper.
+ *
+ * @param transaction Transaction to execute the statement within or
{@code null}.
+ * @param cancellationToken Cancellation token or {@code null}.
+ * @param mapper Mapper that defines the row type and the way to map
columns to the type members. See {@link Mapper#of}.
+ * @param statement SQL statement to execute.
+ * @param arguments Arguments for the statement.
+ * @param <T> A type of object contained in result set.
+ * @return SQL query results set.
+ */
<T> ResultSet<T> execute(
@Nullable Transaction transaction,
@Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
Statement statement,
- @Nullable Object... arguments);
+ @Nullable Object... arguments
+ );
+
+ /**
+ * Executes SQL query in an asynchronous way.
+ *
+ * @param transaction Transaction to execute the query within or {@code
null}.
+ * @param query SQL query template.
+ * @param arguments Arguments for the template (optional).
+ * @return Operation future.
+ * @throws SqlException If failed.
+ */
+ default CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+ @Nullable Transaction transaction,
+ String query,
+ @Nullable Object... arguments
+ ) {
+ return executeAsync(transaction, (CancellationToken) null, query,
arguments);
+ }
/**
* Executes SQL query in an asynchronous way.
*
* @param transaction Transaction to execute the query within or {@code
null}.
+ * @param cancellationToken Cancellation token or {@code null}.
* @param query SQL query template.
* @param arguments Arguments for the template (optional).
* @return Operation future.
* @throws SqlException If failed.
*/
- CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(@Nullable
Transaction transaction, String query, @Nullable Object... arguments);
+ CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+ @Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
+ String query,
+ @Nullable Object... arguments
+ );
+
+ /**
+ * Executes an SQL statement asynchronously.
+ *
+ * @param transaction Transaction to execute the statement within or
{@code null}.
+ * @param statement SQL statement to execute.
+ * @param arguments Arguments for the statement.
+ * @return Operation future.
+ * @throws SqlException If failed.
+ */
+ default CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+ @Nullable Transaction transaction,
+ Statement statement,
+ @Nullable Object... arguments
+ ) {
+ return executeAsync(transaction, (CancellationToken) null, statement,
arguments);
+ }
+
+ /**
+ * Executes SQL statement in an asynchronous way and maps results to
objects with the provided mapper.
+ *
+ * @param transaction Transaction to execute the statement within or
{@code null}.
+ * @param mapper Mapper that defines the row type and the way to map
columns to the type members. See {@link Mapper#of}.
+ * @param query SQL query template.
+ * @param arguments Arguments for the statement.
+ * @param <T> A type of object contained in result set.
+ * @return Operation future.
+ */
+ default <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ String query,
+ @Nullable Object... arguments
+ ) {
+ return executeAsync(transaction, mapper, null, query, arguments);
+ }
+
+ /**
+ * Executes SQL statement in an asynchronous way and maps results to
objects with the provided mapper.
+ *
+ * @param transaction Transaction to execute the statement within or
{@code null}.
+ * @param mapper Mapper that defines the row type and the way to map
columns to the type members. See {@link Mapper#of}.
+ * @param statement SQL statement to execute.
+ * @param arguments Arguments for the statement.
+ * @param <T> A type of object contained in result set.
+ * @return Operation future.
+ */
+ default <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ Statement statement,
+ @Nullable Object... arguments
+ ) {
+ return executeAsync(transaction, mapper, null, statement, arguments);
+ }
/**
* Executes an SQL statement asynchronously.
*
* @param transaction Transaction to execute the statement within or
{@code null}.
+ * @param cancellationToken Cancellation token or {@code null}.
* @param statement SQL statement to execute.
* @param arguments Arguments for the statement.
* @return Operation future.
@@ -118,13 +283,16 @@ public interface IgniteSql {
*/
CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
@Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
Statement statement,
- @Nullable Object... arguments);
+ @Nullable Object... arguments
+ );
/**
* Executes SQL statement in an asynchronous way and maps results to
objects with the provided mapper.
*
* @param transaction Transaction to execute the statement within or
{@code null}.
+ * @param cancellationToken Cancellation token or {@code null}.
* @param mapper Mapper that defines the row type and the way to map
columns to the type members. See {@link Mapper#of}.
* @param query SQL query template.
* @param arguments Arguments for the statement.
@@ -134,13 +302,16 @@ public interface IgniteSql {
<T> CompletableFuture<AsyncResultSet<T>> executeAsync(
@Nullable Transaction transaction,
@Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
String query,
- @Nullable Object... arguments);
+ @Nullable Object... arguments
+ );
/**
* Executes SQL statement in an asynchronous way and maps results to
objects with the provided mapper.
*
* @param transaction Transaction to execute the statement within or
{@code null}.
+ * @param cancellationToken Cancellation token or {@code null}.
* @param mapper Mapper that defines the row type and the way to map
columns to the type members. See {@link Mapper#of}.
* @param statement SQL statement to execute.
* @param arguments Arguments for the statement.
@@ -150,8 +321,10 @@ public interface IgniteSql {
<T> CompletableFuture<AsyncResultSet<T>> executeAsync(
@Nullable Transaction transaction,
@Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
Statement statement,
- @Nullable Object... arguments);
+ @Nullable Object... arguments
+ );
/**
* Executes a batched SQL query. Only DML queries are supported.
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
index a73bad6915..75a005a7c7 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
@@ -154,6 +154,7 @@ public class JdbcQueryEventHandlerImpl extends
JdbcHandlerBase implements JdbcQu
properties,
igniteTransactions.observableTimestampTracker(),
tx,
+ null,
req.sqlQuery(),
req.arguments() == null ? OBJECT_EMPTY_ARRAY : req.arguments()
);
@@ -280,6 +281,7 @@ public class JdbcQueryEventHandlerImpl extends
JdbcHandlerBase implements JdbcQu
properties,
igniteTransactions.observableTimestampTracker(),
tx,
+ null,
sql,
arg == null ? OBJECT_EMPTY_ARRAY : arg
);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index c3fa0795fe..1cae217b90 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -167,8 +167,13 @@ public class ClientSqlExecuteRequest {
.build();
CompletableFuture<AsyncResultSet<SqlRow>> fut = qryProc.queryAsync(
- properties,
transactions.observableTimestampTracker(), (InternalTransaction) transaction,
query, arguments)
- .thenCompose(cur -> cur.requestNextAsync(pageSize)
+ properties,
+ transactions.observableTimestampTracker(),
+ (InternalTransaction) transaction,
+ null,
+ query,
+ arguments
+ ).thenCompose(cur -> cur.requestNextAsync(pageSize)
.thenApply(
batchRes -> new AsyncResultSetImpl<>(
cur,
diff --git
a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java
b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java
index d26a857dee..7ece7f2ccb 100644
---
a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java
+++
b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java
@@ -162,7 +162,7 @@ class JdbcQueryEventHandlerImplTest extends
BaseIgniteAbstractTest {
@Test
public void singleTxUsedForMultipleOperations() {
- when(queryProcessor.queryAsync(any(), any(), any(), any(),
any(Object[].class)))
+ when(queryProcessor.queryAsync(any(), any(), any(), any(), any(),
any(Object[].class)))
.thenReturn(CompletableFuture.failedFuture(new
RuntimeException("Expected")));
InternalTransaction tx = mock(InternalTransaction.class);
@@ -196,7 +196,7 @@ class JdbcQueryEventHandlerImplTest extends
BaseIgniteAbstractTest {
verify(igniteTransactions, times(5)).observableTimestampTracker();
verifyNoMoreInteractions(igniteTransactions);
- verify(queryProcessor, times(5)).queryAsync(any(), any(), any(),
any(), any(Object[].class));
+ verify(queryProcessor, times(5)).queryAsync(any(), any(), any(),
any(), any(), any(Object[].class));
}
private long acquireConnectionId() {
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
index 437b945c4d..60e5175a13 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.sql.StatementBuilderImpl;
import org.apache.ignite.internal.sql.StatementImpl;
import org.apache.ignite.internal.sql.SyncResultSetAdapter;
import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.sql.BatchedArguments;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.ResultSet;
@@ -91,11 +92,16 @@ public class ClientSql implements IgniteSql {
/** {@inheritDoc} */
@Override
- public ResultSet<SqlRow> execute(@Nullable Transaction transaction, String
query, @Nullable Object... arguments) {
+ public ResultSet<SqlRow> execute(
+ @Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
+ String query,
+ @Nullable Object... arguments
+ ) {
Objects.requireNonNull(query);
try {
- return new SyncResultSetAdapter<>(executeAsync(transaction, query,
arguments).join());
+ return new SyncResultSetAdapter<>(executeAsync(transaction,
cancellationToken, query, arguments).join());
} catch (CompletionException e) {
throw
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
}
@@ -103,11 +109,16 @@ public class ClientSql implements IgniteSql {
/** {@inheritDoc} */
@Override
- public ResultSet<SqlRow> execute(@Nullable Transaction transaction,
Statement statement, @Nullable Object... arguments) {
+ public ResultSet<SqlRow> execute(
+ @Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
+ Statement statement,
+ @Nullable Object... arguments
+ ) {
Objects.requireNonNull(statement);
try {
- return new SyncResultSetAdapter<>(executeAsync(transaction,
statement, arguments).join());
+ return new SyncResultSetAdapter<>(executeAsync(transaction,
cancellationToken, statement, arguments).join());
} catch (CompletionException e) {
throw
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
}
@@ -118,12 +129,14 @@ public class ClientSql implements IgniteSql {
public <T> ResultSet<T> execute(
@Nullable Transaction transaction,
@Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
String query,
- @Nullable Object... arguments) {
+ @Nullable Object... arguments
+ ) {
Objects.requireNonNull(query);
try {
- return new SyncResultSetAdapter<>(executeAsync(transaction,
mapper, query, arguments).join());
+ return new SyncResultSetAdapter<>(executeAsync(transaction,
mapper, cancellationToken, query, arguments).join());
} catch (CompletionException e) {
throw
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
}
@@ -134,12 +147,14 @@ public class ClientSql implements IgniteSql {
public <T> ResultSet<T> execute(
@Nullable Transaction transaction,
@Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
Statement statement,
- @Nullable Object... arguments) {
+ @Nullable Object... arguments
+ ) {
Objects.requireNonNull(statement);
try {
- return new SyncResultSetAdapter<>(executeAsync(transaction,
mapper, statement, arguments).join());
+ return new SyncResultSetAdapter<>(executeAsync(transaction,
mapper, cancellationToken, statement, arguments).join());
} catch (CompletionException e) {
throw
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
}
@@ -177,22 +192,24 @@ public class ClientSql implements IgniteSql {
@Override
public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
@Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
String query,
@Nullable Object... arguments) {
Objects.requireNonNull(query);
StatementImpl statement = new StatementImpl(query);
- return executeAsync(transaction, statement, arguments);
+ return executeAsync(transaction, cancellationToken, statement,
arguments);
}
/** {@inheritDoc} */
@Override
public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
@Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
Statement statement,
@Nullable Object... arguments) {
- return executeAsync(transaction, sqlRowMapper, statement, arguments);
+ return executeAsync(transaction, sqlRowMapper, cancellationToken,
statement, arguments);
}
/** {@inheritDoc} */
@@ -200,13 +217,14 @@ public class ClientSql implements IgniteSql {
public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
@Nullable Transaction transaction,
@Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
String query,
@Nullable Object... arguments) {
Objects.requireNonNull(query);
StatementImpl statement = new StatementImpl(query);
- return executeAsync(transaction, mapper, statement, arguments);
+ return executeAsync(transaction, mapper, cancellationToken, statement,
arguments);
}
/** {@inheritDoc} */
@@ -214,6 +232,7 @@ public class ClientSql implements IgniteSql {
public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
@Nullable Transaction transaction,
@Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
Statement statement,
@Nullable Object... arguments) {
Objects.requireNonNull(statement);
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
index 6163a589bb..6c5eef1132 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
@@ -32,6 +32,7 @@ import
org.apache.ignite.internal.sql.engine.property.SqlProperties;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.sql.SqlException;
import org.jetbrains.annotations.Nullable;
@@ -44,8 +45,12 @@ public class FakeIgniteQueryProcessor implements
QueryProcessor {
String lastScript;
@Override
- public CompletableFuture<QueryMetadata> prepareSingleAsync(SqlProperties
properties,
- @Nullable InternalTransaction transaction, String qry, Object...
params) {
+ public CompletableFuture<QueryMetadata> prepareSingleAsync(
+ SqlProperties properties,
+ @Nullable InternalTransaction transaction,
+ String qry,
+ Object... params
+ ) {
throw new UnsupportedOperationException();
}
@@ -54,6 +59,7 @@ public class FakeIgniteQueryProcessor implements
QueryProcessor {
SqlProperties properties,
HybridTimestampTracker observableTimeTracker,
@Nullable InternalTransaction transaction,
+ @Nullable CancellationToken cancellationToken,
String qry,
Object... params
) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/Cancellable.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/Cancellable.java
index 5f6cd721e1..f5d59fc717 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/Cancellable.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/Cancellable.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.util;
/**
* A {@code Cancellable} represents a process or an operation that can be
canceled.
*/
+@FunctionalInterface
public interface Cancellable {
/**
* Cancels the ongoing operation or process or do nothing if it has been
already cancelled.
diff --git
a/modules/core/src/main/java/org/apache/ignite/lang/CancelHandleHelper.java
b/modules/core/src/main/java/org/apache/ignite/lang/CancelHandleHelper.java
new file mode 100644
index 0000000000..933f0338e8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/lang/CancelHandleHelper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.lang.CancelHandleImpl.CancellationTokenImpl;
+
+/**
+ * Utility class to provide direct access to internals of {@link
CancelHandleImpl}.
+ */
+public final class CancelHandleHelper {
+
+ private CancelHandleHelper() {
+
+ }
+
+ /**
+ * Attaches a cancellable operation to the given token. A cancellation
procedure started its handle completes
+ * when {@code completionFut} completes.
+ *
+ * <p>NOTE: If a handle, this token is associated with, was cancelled or
its cancellation was requested,
+ * this method immediately invokes {@code cancelAction.run()} and it this
case
+ * <b>it never waits for {@code completionFut} to complete</b>.
+ *
+ * <p>The following methods request cancellation of a handle:
+ * <ul>
+ * <li>{@link CancelHandle#cancel()}</li>
+ * <li>{@link CancelHandle#cancelAsync()}</li>
+ * </ul>
+ *
+ * @param token Cancellation token.
+ * @param cancelAction Action that terminates an operation.
+ * @param completionFut Future that completes when operation completes and
all resources it created are released.
+ */
+ public static void addCancelAction(
+ CancellationToken token,
+ Runnable cancelAction,
+ CompletableFuture<?> completionFut
+ ) {
+ Objects.requireNonNull(token, "token");
+ Objects.requireNonNull(cancelAction, "cancelAction");
+ Objects.requireNonNull(completionFut, "completionFut");
+
+ CancellationTokenImpl t = unwrapToken(token);
+ t.addCancelAction(cancelAction, completionFut);
+ }
+
+ private static CancellationTokenImpl unwrapToken(CancellationToken token) {
+ if (token instanceof CancellationTokenImpl) {
+ return (CancellationTokenImpl) token;
+ } else {
+ throw new IllegalArgumentException("Unexpected CancellationToken:
" + token.getClass());
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/lang/CancelHandleHelperSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/lang/CancelHandleHelperSelfTest.java
new file mode 100644
index 0000000000..c2b8ddb43e
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/lang/CancelHandleHelperSelfTest.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.lang.ErrorGroups.Common;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+/**
+ * Tests for {@link CancelHandleHelper}.
+ */
+public class CancelHandleHelperSelfTest extends BaseIgniteAbstractTest {
+
+ @Test
+ public void testCancelSync() throws InterruptedException {
+ CancelHandle cancelHandle = CancelHandle.create();
+ CancellationToken token = cancelHandle.token();
+
+ // Initially is not cancelled
+ assertFalse(cancelHandle.isCancelled());
+
+ CountDownLatch operationLatch = new CountDownLatch(1);
+ CompletableFuture<Void> cancelFut = new CompletableFuture<>();
+
+ Runnable cancelAction = () -> {
+ try {
+ operationLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ cancelFut.complete(null);
+ };
+
+ CancelHandleHelper.addCancelAction(token, cancelAction, cancelFut);
+
+ CountDownLatch cancelHandleLatch = new CountDownLatch(1);
+
+ // Call cancel in another thread.
+ Thread thread = new Thread(() -> {
+ cancelHandle.cancel();
+ cancelHandleLatch.countDown();
+ });
+ thread.start();
+
+ // Make it possible for cancelAction to complete, because cancelHandle
calls it in its thread.
+ operationLatch.countDown();
+
+ // Wait until sync cancel returns.
+ cancelHandleLatch.await();
+
+ // Cancellation has completed
+ assertTrue(cancelHandle.cancelAsync().isDone());
+ assertTrue(cancelHandle.isCancelled());
+ assertTrue(cancelHandle.cancelAsync().isDone());
+
+ // Should have no affect
+ cancelHandle.cancel();
+ }
+
+ @Test
+ public void testCancelAsync() {
+ CancelHandle cancelHandle = CancelHandle.create();
+ CancellationToken token = cancelHandle.token();
+
+ // Initially is not cancelled
+ assertFalse(cancelHandle.isCancelled());
+ CountDownLatch operationLatch = new CountDownLatch(1);
+ CompletableFuture<Void> cancelFut = new CompletableFuture<>();
+
+ Runnable cancelAction = () -> {
+ // Run in another thread to avoid blocking.
+ Thread thread = new Thread(() -> {
+ try {
+ operationLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ cancelFut.complete(null);
+ });
+ thread.start();
+ };
+
+ CancelHandleHelper.addCancelAction(token, cancelAction, cancelFut);
+
+ // Request cancellation and keep the future, to call it later.
+ CompletableFuture<Void> cancelHandleFut = cancelHandle.cancelAsync();
+ assertTrue(cancelHandle.isCancelled());
+
+ assertFalse(cancelHandleFut.isDone());
+ operationLatch.countDown();
+
+ // Await for cancellation to complete
+ cancelHandleFut.join();
+
+ assertTrue(cancelHandle.isCancelled());
+ assertTrue(cancelHandle.cancelAsync().isDone());
+ }
+
+ @Test
+ public void testCancelAsyncReturnsCopy() {
+ CancelHandle cancelHandle = CancelHandle.create();
+
+ CompletableFuture<Void> f1 = cancelHandle.cancelAsync();
+ CompletableFuture<Void> f2 = cancelHandle.cancelAsync();
+ assertNotSame(f1, f2);
+ }
+
+ @Test
+ public void testRunCancelActionImmediatelyIfCancelSyncCalled() {
+ CancelHandle cancelHandle = CancelHandle.create();
+ CancellationToken token = cancelHandle.token();
+
+ cancelHandle.cancel();
+ assertTrue(cancelHandle.isCancelled());
+
+ Runnable action = Mockito.mock(Runnable.class);
+ CompletableFuture<Void> f = new CompletableFuture<>();
+
+ // Attach it to some operation hasn't completed yet
+ CancelHandleHelper.addCancelAction(token, action, f);
+ verify(action, times(1)).run();
+
+ cancelHandle.cancelAsync().join();
+ // We do not wait for cancellation to complete because
+ // operation has not started yet.
+ assertFalse(f.isDone());
+
+ // Action runs immediately
+ CancelHandleHelper.addCancelAction(token, action, f);
+ verify(action, times(2)).run();
+ }
+
+ @Test
+ public void testRunCancelActionImmediatelyIfCancelAsyncCalled() {
+ CancelHandle cancelHandle = CancelHandle.create();
+ CancellationToken token = cancelHandle.token();
+
+ cancelHandle.cancelAsync();
+ assertTrue(cancelHandle.isCancelled());
+
+ Runnable action = Mockito.mock(Runnable.class);
+ CompletableFuture<Void> f = new CompletableFuture<>();
+
+ // Attach it to some operation hasn't completed yet
+ CancelHandleHelper.addCancelAction(token, action, f);
+ verify(action, times(1)).run();
+
+ cancelHandle.cancelAsync().join();
+ // We do not wait for cancellation to complete because
+ // operation has not started yet.
+ assertFalse(f.isDone());
+
+ // Action runs immediately
+ CancelHandleHelper.addCancelAction(token, action, f);
+ verify(action, times(2)).run();
+ }
+
+ @Test
+ public void testArgumentsMustNotBeNull() {
+ CancelHandle cancelHandle = CancelHandle.create();
+ CancellationToken token = cancelHandle.token();
+ Runnable action = Mockito.mock(Runnable.class);
+ CompletableFuture<Void> f = CompletableFuture.completedFuture(null);
+
+ {
+ NullPointerException err = assertThrows(
+ NullPointerException.class,
+ () -> CancelHandleHelper.addCancelAction(null, action, f)
+ );
+ assertEquals("token", err.getMessage());
+ }
+
+ {
+ NullPointerException err = assertThrows(
+ NullPointerException.class,
+ () -> CancelHandleHelper.addCancelAction(token, null, f)
+ );
+ assertEquals("cancelAction", err.getMessage());
+ }
+
+ {
+ NullPointerException err = assertThrows(
+ NullPointerException.class,
+ () -> CancelHandleHelper.addCancelAction(token, action,
null)
+ );
+ assertEquals("completionFut", err.getMessage());
+ }
+ }
+
+ @Test
+ public void testMultipleOperations() {
+ class Operation {
+ private final CountDownLatch latch = new CountDownLatch(1);
+ private final CompletableFuture<Void> cancelFut = new
CompletableFuture<>();
+ private final Runnable cancelAction = () -> {
+ // Run in another thread to avoid blocking.
+ Thread thread = new Thread(() -> {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ cancelFut.complete(null);
+ });
+ thread.start();
+ };
+ }
+
+ CancelHandle cancelHandle = CancelHandle.create();
+ CancellationToken token = cancelHandle.token();
+
+ Operation operation1 = new Operation();
+ Operation operation2 = new Operation();
+
+ CancelHandleHelper.addCancelAction(token, operation1.cancelAction,
operation1.cancelFut);
+ CancelHandleHelper.addCancelAction(token, operation2.cancelAction,
operation2.cancelFut);
+
+ cancelHandle.cancelAsync();
+ assertFalse(operation1.cancelFut.isDone());
+
+ // Cancel the first operation
+ operation1.latch.countDown();
+ operation1.cancelFut.join();
+
+ // The cancelHandle is still not done
+ assertFalse(cancelHandle.cancelAsync().isDone());
+
+ // Cancel the second operation
+ operation2.latch.countDown();
+ operation2.cancelFut.join();
+
+ cancelHandle.cancelAsync().join();
+ assertTrue(cancelHandle.cancelAsync().isDone());
+ }
+
+ @Test
+ public void testExceptionsInCancelActionsAreWrapped() {
+ CancelHandle cancelHandle = CancelHandle.create();
+ CancellationToken token = cancelHandle.token();
+
+ RuntimeException e1 = new RuntimeException("e1");
+ Runnable r1 = () -> {
+ throw e1;
+ };
+ CompletableFuture<Object> f1 = new CompletableFuture<>();
+
+ RuntimeException e2 = new RuntimeException("e2");
+ Runnable r2 = () -> {
+ throw e2;
+ };
+ CompletableFuture<Object> f2 = new CompletableFuture<>();
+
+ CancelHandleHelper.addCancelAction(token, r1, f1);
+ CancelHandleHelper.addCancelAction(token, r2, f2);
+
+ f1.complete(null);
+ f2.complete(null);
+
+ IgniteException err = assertThrows(IgniteException.class,
cancelHandle::cancel);
+
+ assertEquals("Failed to cancel an operation", err.getMessage());
+ assertEquals(Common.INTERNAL_ERR, err.code(), err.toString());
+ assertEquals(Arrays.asList(e1, e2),
Arrays.asList(err.getSuppressed()));
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
index 16117bf584..bde9e1b730 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
@@ -90,7 +90,7 @@ public class AbstractMultiNodeBenchmark {
getAllFromCursor(
await(queryEngine.queryAsync(
- SqlPropertiesHelper.emptyProperties(),
igniteImpl.observableTimeTracker(), null, createZoneStatement
+ SqlPropertiesHelper.emptyProperties(),
igniteImpl.observableTimeTracker(), null, null, createZoneStatement
))
);
@@ -136,7 +136,7 @@ public class AbstractMultiNodeBenchmark {
getAllFromCursor(
await(igniteImpl.queryEngine().queryAsync(
- SqlPropertiesHelper.emptyProperties(),
igniteImpl.observableTimeTracker(), null, createTableStatement
+ SqlPropertiesHelper.emptyProperties(),
igniteImpl.observableTimeTracker(), null, null, createTableStatement
))
);
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
index a33198c32c..52de6ec493 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java
@@ -236,11 +236,11 @@ public class SelectBenchmark extends
AbstractMultiNodeBenchmark {
}
private Iterator<InternalSqlRow> query(String sql, Object... args) {
- return handleFirstBatch(queryProc.queryAsync(properties,
igniteImpl.observableTimeTracker(), null, sql, args));
+ return handleFirstBatch(queryProc.queryAsync(properties,
igniteImpl.observableTimeTracker(), null, null, sql, args));
}
private Iterator<InternalSqlRow> script(String sql, Object... args) {
- return handleFirstBatch(queryProc.queryAsync(scriptProperties,
igniteImpl.observableTimeTracker(), null, sql, args));
+ return handleFirstBatch(queryProc.queryAsync(scriptProperties,
igniteImpl.observableTimeTracker(), null, null, sql, args));
}
private Iterator<InternalSqlRow>
handleFirstBatch(CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFut) {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlMultiStatementBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlMultiStatementBenchmark.java
index fe8059283a..2ccb23aa71 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlMultiStatementBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlMultiStatementBenchmark.java
@@ -380,14 +380,14 @@ public class SqlMultiStatementBenchmark extends
AbstractMultiNodeBenchmark {
Iterator<InternalSqlRow> execQuery(String sql, Object ... args) {
AsyncSqlCursor<InternalSqlRow> cursor =
- queryProcessor.queryAsync(props, observableTimeTracker,
null, sql, args).join();
+ queryProcessor.queryAsync(props, observableTimeTracker,
null, null, sql, args).join();
return new InternalResultsIterator(cursor, pageSize);
}
Iterator<InternalSqlRow> execScript(String sql, Object ... args) {
AsyncSqlCursor<InternalSqlRow> cursor =
- queryProcessor.queryAsync(scriptProps,
observableTimeTracker, null, sql, args).join();
+ queryProcessor.queryAsync(scriptProps,
observableTimeTracker, null, null, sql, args).join();
return new InternalResultsIterator(cursor, pageSize);
}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteSql.java
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteSql.java
index e120ebfa19..f56e4cc5b6 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteSql.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteSql.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.wrapper.Wrapper;
import org.apache.ignite.internal.wrapper.Wrappers;
+import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.sql.BatchedArguments;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.ResultSet;
@@ -58,71 +59,126 @@ class RestartProofIgniteSql implements IgniteSql, Wrapper {
}
@Override
- public ResultSet<SqlRow> execute(@Nullable Transaction transaction, String
query, @Nullable Object... arguments) {
- return attachmentLock.attached(ignite ->
ignite.sql().execute(transaction, query, arguments));
+ public ResultSet<SqlRow> execute(
+ @Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
+ String query,
+ @Nullable Object... arguments
+ ) {
+ return attachmentLock.attached(ignite ->
ignite.sql().execute(transaction, cancellationToken, query, arguments));
}
@Override
- public ResultSet<SqlRow> execute(@Nullable Transaction transaction,
Statement statement, @Nullable Object... arguments) {
- return attachmentLock.attached(ignite ->
ignite.sql().execute(transaction, statement, arguments));
+ public ResultSet<SqlRow> execute(
+ @Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
+ Statement statement,
+ @Nullable Object... arguments
+ ) {
+ return attachmentLock.attached(ignite -> ignite.sql().execute(
+ transaction,
+ cancellationToken,
+ statement,
+ arguments)
+ );
}
@Override
public <T> ResultSet<T> execute(
@Nullable Transaction transaction,
@Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
String query,
@Nullable Object... arguments
) {
- return attachmentLock.attached(ignite ->
ignite.sql().execute(transaction, mapper, query, arguments));
+ return attachmentLock.attached(ignite -> ignite.sql().execute(
+ transaction,
+ mapper,
+ cancellationToken,
+ query,
+ arguments)
+ );
}
@Override
public <T> ResultSet<T> execute(
@Nullable Transaction transaction,
@Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
Statement statement,
@Nullable Object... arguments
) {
- return attachmentLock.attached(ignite ->
ignite.sql().execute(transaction, mapper, statement, arguments));
+ return attachmentLock.attached(ignite -> ignite.sql().execute(
+ transaction,
+ mapper,
+ cancellationToken,
+ statement,
+ arguments)
+ );
}
@Override
public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
@Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
String query,
@Nullable Object... arguments
) {
- return attachmentLock.attachedAsync(ignite ->
ignite.sql().executeAsync(transaction, query, arguments));
+ return attachmentLock.attachedAsync(ignite ->
ignite.sql().executeAsync(
+ transaction,
+ cancellationToken,
+ query,
+ arguments)
+ );
}
@Override
public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
@Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
Statement statement,
@Nullable Object... arguments
) {
- return attachmentLock.attachedAsync(ignite ->
ignite.sql().executeAsync(transaction, statement, arguments));
+ return attachmentLock.attachedAsync(ignite ->
ignite.sql().executeAsync(
+ transaction,
+ cancellationToken,
+ statement,
+ arguments)
+ );
}
@Override
public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
@Nullable Transaction transaction,
@Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
String query,
@Nullable Object... arguments
) {
- return attachmentLock.attachedAsync(ignite ->
ignite.sql().executeAsync(transaction, mapper, query, arguments));
+ return attachmentLock.attachedAsync(ignite ->
ignite.sql().executeAsync(
+ transaction,
+ mapper,
+ cancellationToken,
+ query,
+ arguments)
+ );
}
@Override
public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
@Nullable Transaction transaction,
@Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
Statement statement,
@Nullable Object... arguments
) {
- return attachmentLock.attachedAsync(ignite ->
ignite.sql().executeAsync(transaction, mapper, statement, arguments));
+ return attachmentLock.attachedAsync(ignite ->
ignite.sql().executeAsync(
+ transaction,
+ mapper,
+ cancellationToken,
+ statement,
+ arguments)
+ );
}
@Override
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index 9446c7ae8d..a888854552 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -20,28 +20,37 @@ package org.apache.ignite.internal.sql.api;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.ignite.internal.sql.SyncResultSetAdapter;
import org.apache.ignite.internal.wrapper.Wrappers;
+import org.apache.ignite.lang.CancelHandle;
+import org.apache.ignite.lang.CancellationToken;
+import org.apache.ignite.lang.ErrorGroups.Sql;
import org.apache.ignite.sql.BatchedArguments;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlException;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.Statement;
import org.apache.ignite.sql.async.AsyncResultSet;
import org.apache.ignite.tx.Transaction;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
/**
* Tests for asynchronous SQL API.
@@ -95,6 +104,95 @@ public class ItSqlAsynchronousApiTest extends
ItSqlApiBaseTest {
}
}
+ @Test
+ public void cancelQueryString() throws InterruptedException {
+ IgniteSql sql = igniteSql();
+ String query = "SELECT * FROM system_range(0, 10000000000)";
+
+ // no transaction
+ executeAndCancel((token) -> {
+ return sql.executeAsync(null, token, query);
+ });
+
+ // with transaction
+ executeAndCancel((token) -> {
+ Transaction transaction = igniteTx().begin();
+
+ return sql.executeAsync(transaction, token, query);
+ });
+ }
+
+ @Test
+ public void cancelStatement() throws InterruptedException {
+ IgniteSql sql = igniteSql();
+
+ String query = "SELECT * FROM system_range(0, 10000000000)";
+
+ // no transaction
+ executeAndCancel((token) -> {
+ Statement statement = sql.statementBuilder()
+ .query(query)
+ .build();
+
+ return sql.executeAsync(null, token, statement);
+ });
+
+ // with transaction
+ executeAndCancel((token) -> {
+ Statement statement = sql.statementBuilder()
+ .query(query)
+ .build();
+
+ Transaction transaction = igniteTx().begin();
+
+ return sql.executeAsync(transaction, token, statement);
+ });
+ }
+
+ private static void executeAndCancel(
+ Function<CancellationToken,
CompletableFuture<AsyncResultSet<SqlRow>>> execute
+ ) throws InterruptedException {
+
+ CancelHandle cancelHandle = CancelHandle.create();
+ CountDownLatch firstResultSetLatch = new CountDownLatch(1);
+
+ CompletableFuture<AsyncResultSet<SqlRow>> resultSetFut =
execute.apply(cancelHandle.token());
+
+ // Wait for the first result set to become available and then cancel
the query
+ resultSetFut.whenComplete((r, t) -> firstResultSetLatch.countDown());
+ firstResultSetLatch.await();
+
+ cancelHandle.cancelAsync();
+
+ CompletionException err = assertThrows(CompletionException.class, new
DrainResultSet(resultSetFut));
+ SqlException sqlErr = assertInstanceOf(SqlException.class,
err.getCause());
+ assertEquals(Sql.EXECUTION_CANCELLED_ERR, sqlErr.code());
+
+ cancelHandle.cancelAsync().join();
+ }
+
+ private static class DrainResultSet implements Executable {
+ CompletableFuture<? extends AsyncResultSet<SqlRow>> rs;
+
+ DrainResultSet(CompletableFuture<? extends AsyncResultSet<SqlRow>> rs)
{
+ this.rs = rs;
+ }
+
+ @Override
+ public void execute() {
+ AsyncResultSet<SqlRow> current;
+ do {
+ current = rs.join();
+ if (current.hasRowSet() && current.hasMorePages()) {
+ rs = current.fetchNextPage();
+ } else {
+ return;
+ }
+
+ } while (current.hasMorePages());
+ }
+ }
+
@Override
protected ResultSet<SqlRow> executeForRead(IgniteSql sql, Transaction tx,
Statement statement, Object... args) {
return new SyncResultSetAdapter(await(sql.executeAsync(tx, statement,
args)));
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
index 22564c4121..8a16f708f1 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
@@ -23,6 +23,7 @@ import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.tx.IgniteTransactions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
/**
* Tests for asynchronous client SQL API.
@@ -40,6 +41,18 @@ public class ItSqlClientAsynchronousApiTest extends
ItSqlAsynchronousApiTest {
client.close();
}
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-23646")
+ @Override
+ public void cancelQueryString() throws InterruptedException {
+ super.cancelQueryString();
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-23646")
+ @Override
+ public void cancelStatement() throws InterruptedException {
+ super.cancelStatement();
+ }
+
@Override
protected IgniteSql igniteSql() {
return client.sql();
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
index 283fb792e6..cdcba98bb2 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
@@ -23,6 +23,7 @@ import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.tx.IgniteTransactions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
/**
* Tests for synchronous client SQL API.
@@ -40,6 +41,18 @@ public class ItSqlClientSynchronousApiTest extends
ItSqlSynchronousApiTest {
client.close();
}
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-23646")
+ @Override
+ public void cancelQueryString() throws InterruptedException {
+ super.cancelQueryString();
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-23646")
+ @Override
+ public void cancelStatement() throws InterruptedException {
+ super.cancelStatement();
+ }
+
@Override
protected IgniteSql igniteSql() {
return client.sql();
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
index 329f82745e..f7cb46f1d9 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
@@ -19,20 +19,108 @@ package org.apache.ignite.internal.sql.api;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Function;
+import org.apache.ignite.lang.CancelHandle;
+import org.apache.ignite.lang.CancellationToken;
+import org.apache.ignite.lang.ErrorGroups.Sql;
import org.apache.ignite.sql.BatchedArguments;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlException;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.Statement;
import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
/**
* Tests for synchronous SQL API.
*/
public class ItSqlSynchronousApiTest extends ItSqlApiBaseTest {
+
+ private final List<Transaction> transactions = new ArrayList<>();
+
+ @AfterEach
+ public void rollbackTransactions() {
+ transactions.forEach(Transaction::rollback);
+ }
+
+ @Test
+ public void cancelQueryString() throws InterruptedException {
+ IgniteSql sql = igniteSql();
+ String query = "SELECT * FROM system_range(0, 10000000000)";
+
+ // no transaction
+ executeAndCancel((token) -> {
+ Statement statement = sql.statementBuilder()
+ .query(query)
+ .build();
+
+ return sql.execute(null, token, statement);
+ });
+
+ // with transaction
+ executeAndCancel((token) -> {
+ Statement statement = sql.statementBuilder()
+ .query(query)
+ .build();
+
+ Transaction transaction = igniteTx().begin();
+ return sql.execute(transaction, token, statement);
+ });
+ }
+
+ @Test
+ public void cancelStatement() throws InterruptedException {
+ IgniteSql sql = igniteSql();
+ String query = "SELECT * FROM system_range(0, 10000000000)";
+
+ // no transaction
+ executeAndCancel((token) -> {
+ return sql.execute(null, token, query);
+ });
+
+ // with transaction
+ executeAndCancel((token) -> {
+ Transaction transaction = igniteTx().begin();
+ return sql.execute(transaction, token, query);
+ });
+ }
+
+ private static void executeAndCancel(Function<CancellationToken,
ResultSet<SqlRow>> execute) throws InterruptedException {
+ CancelHandle cancelHandle = CancelHandle.create();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ // Run statement in another thread
+ CompletableFuture<Void> f = CompletableFuture.runAsync(() -> {
+ try (ResultSet<SqlRow> resultSet =
execute.apply(cancelHandle.token())) {
+ // Start a query and cancel it later.
+ latch.countDown();
+ while (resultSet.hasNext()) {
+ resultSet.next();
+ }
+ }
+ });
+
+ latch.await();
+ cancelHandle.cancelAsync();
+
+ CompletionException err = assertThrows(CompletionException.class,
f::join);
+ SqlException sqlErr = assertInstanceOf(SqlException.class,
err.getCause());
+ assertEquals(Sql.EXECUTION_CANCELLED_ERR, sqlErr.code());
+
+ cancelHandle.cancelAsync().join();
+ }
+
@Override
protected ResultSet<SqlRow> executeForRead(IgniteSql sql, Transaction tx,
Statement statement, Object... args) {
return sql.execute(tx, statement, args);
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/BaseSqlMultiStatementTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/BaseSqlMultiStatementTest.java
index 23ed7fc3d8..9f40678fd1 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/BaseSqlMultiStatementTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/BaseSqlMultiStatementTest.java
@@ -82,7 +82,7 @@ public abstract class BaseSqlMultiStatementTest extends
BaseSqlIntegrationTest {
.build();
AsyncSqlCursor<InternalSqlRow> cursor = await(
- queryProcessor().queryAsync(properties,
observableTimeTracker(), tx, query, params)
+ queryProcessor().queryAsync(properties,
observableTimeTracker(), tx, null, query, params)
);
return Objects.requireNonNull(cursor);
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItQueryCancelTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItQueryCancelTest.java
new file mode 100644
index 0000000000..6c47f523e5
--- /dev/null
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItQueryCancelTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static
org.apache.ignite.internal.sql.engine.QueryProperty.ALLOWED_QUERY_TYPES;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import org.apache.ignite.internal.TestWrappers;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.apache.ignite.internal.sql.engine.property.SqlProperties;
+import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper;
+import org.apache.ignite.internal.tx.HybridTimestampTracker;
+import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
+import org.apache.ignite.lang.CancelHandle;
+import org.apache.ignite.lang.CancellationToken;
+import org.apache.ignite.lang.ErrorGroups.Sql;
+import org.apache.ignite.sql.SqlException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/** Set of test cases for query cancellation. */
+public class ItQueryCancelTest extends BaseSqlIntegrationTest {
+
+ /** Calling {@link CancelHandle#cancel()} should cancel execution of a
single query. */
+ @Test
+ public void testCancelSingleQuery() {
+ IgniteImpl igniteImpl = TestWrappers.unwrapIgniteImpl(CLUSTER.node(0));
+
+ SqlProperties properties = SqlPropertiesHelper.newBuilder()
+ .set(ALLOWED_QUERY_TYPES, Set.of(SqlQueryType.QUERY))
+ .build();
+
+ HybridTimestampTracker hybridTimestampTracker =
igniteImpl.observableTimeTracker();
+
+ SqlQueryProcessor qryProc = queryProcessor();
+
+ StringBuilder query = new StringBuilder();
+
+ query.append("SELECT v FROM (VALUES ");
+ for (int i = 0; i < 100; i++) {
+ if (i > 0) {
+ query.append(", ");
+ }
+ query.append("(");
+ query.append(i);
+ query.append(")");
+ }
+ query.append(" ) t(v)");
+
+ CancelHandle cancelHandle = CancelHandle.create();
+ CancellationToken token = cancelHandle.token();
+
+ AsyncSqlCursor<InternalSqlRow> query1 = qryProc.queryAsync(
+ properties,
+ hybridTimestampTracker,
+ null,
+ token,
+ query.toString()
+ ).join();
+
+ assertEquals(1, qryProc.runningQueries());
+
+ // Request cancellation.
+ CompletableFuture<Void> cancelled = cancelHandle.cancelAsync();
+
+ // Obverse cancellation error
+ expectQueryCancelled(new DrainCursor(query1));
+
+ cancelled.join();
+
+ assertEquals(0, qryProc.runningQueries());
+ }
+
+ /** Calling {@link CancelHandle#cancel()} should cancel execution of
multiple queries. */
+ @Test
+ public void testCancelMultipleQueries() {
+ IgniteImpl igniteImpl = TestWrappers.unwrapIgniteImpl(CLUSTER.node(0));
+
+ SqlProperties properties = SqlPropertiesHelper.newBuilder()
+ .set(ALLOWED_QUERY_TYPES, Set.of(SqlQueryType.QUERY))
+ .build();
+
+ HybridTimestampTracker hybridTimestampTracker =
igniteImpl.observableTimeTracker();
+
+ SqlQueryProcessor qryProc = queryProcessor();
+
+ StringBuilder query = new StringBuilder();
+
+ query.append("SELECT v FROM (VALUES ");
+ for (int i = 0; i < 100; i++) {
+ if (i > 0) {
+ query.append(", ");
+ }
+ query.append("(");
+ query.append(i);
+ query.append(")");
+ }
+ query.append(" ) t(v)");
+
+ CancelHandle cancelHandle = CancelHandle.create();
+ CancellationToken token = cancelHandle.token();
+
+ AsyncSqlCursor<InternalSqlRow> query1 = qryProc.queryAsync(
+ properties,
+ hybridTimestampTracker,
+ null,
+ token,
+ query.toString()
+ ).join();
+
+ AsyncSqlCursor<InternalSqlRow> query2 = qryProc.queryAsync(
+ properties,
+ hybridTimestampTracker,
+ null,
+ token,
+ query.toString()
+ ).join();
+
+ assertEquals(2, qryProc.runningQueries());
+
+ // Request cancellation.
+ CompletableFuture<Void> cancelled = cancelHandle.cancelAsync();
+
+ // Obverse cancellation errors
+ expectQueryCancelled(new DrainCursor(query1));
+ expectQueryCancelled(new DrainCursor(query2));
+
+ cancelled.join();
+
+ assertEquals(0, qryProc.runningQueries());
+ }
+
+ /** Starting a query with a cancelled token should trigger query
cancellation. */
+ @Test
+ public void testQueryWontStartWhenHandleIsCancelled() {
+ IgniteImpl igniteImpl = TestWrappers.unwrapIgniteImpl(CLUSTER.node(0));
+
+ SqlProperties properties = SqlPropertiesHelper.newBuilder()
+ .set(ALLOWED_QUERY_TYPES, Set.of(SqlQueryType.QUERY))
+ .build();
+
+ HybridTimestampTracker hybridTimestampTracker =
igniteImpl.observableTimeTracker();
+
+ SqlQueryProcessor qryProc = queryProcessor();
+
+ CancelHandle cancelHandle = CancelHandle.create();
+ CancellationToken token = cancelHandle.token();
+
+ cancelHandle.cancel();
+
+ Runnable run = () -> qryProc.queryAsync(
+ properties,
+ hybridTimestampTracker,
+ null,
+ token,
+ "SELECT 1"
+ ).join();
+
+ expectQueryCancelledWithQueryCancelledException(run);
+
+ assertEquals(0, qryProc.runningQueries());
+ }
+
+ /** Calling {@link CancelHandle#cancel()} should cancel execution of
queries that use executable plans. */
+ @ParameterizedTest
+ @ValueSource(strings = {
+ "SELECT * FROM t WHERE id = 1",
+ "INSERT INTO t VALUES (1, 1)",
+ "SELECT COUNT(*) FROM t",
+ })
+ public void testExecutablePlans(String query) {
+ sql("CREATE TABLE IF NOT EXISTS t (id INT PRIMARY KEY, val INT)");
+
+ IgniteImpl igniteImpl = TestWrappers.unwrapIgniteImpl(CLUSTER.node(0));
+
+ SqlProperties properties = SqlPropertiesHelper.newBuilder()
+ .set(ALLOWED_QUERY_TYPES, Set.of(SqlQueryType.QUERY,
SqlQueryType.DML))
+ .build();
+
+ HybridTimestampTracker hybridTimestampTracker =
igniteImpl.observableTimeTracker();
+
+ SqlQueryProcessor qryProc = queryProcessor();
+
+ CancelHandle cancelHandle = CancelHandle.create();
+ CancellationToken token = cancelHandle.token();
+
+ Runnable run = () -> qryProc.queryAsync(
+ properties,
+ hybridTimestampTracker,
+ null,
+ token,
+ query
+ ).join();
+
+ // Request cancellation
+ CompletableFuture<Void> f = cancelHandle.cancelAsync();
+
+ // Obverse cancellation error
+ expectQueryCancelledWithQueryCancelledException(run);
+
+ f.join();
+
+ assertEquals(0, qryProc.runningQueries());
+ }
+
+ private static void expectQueryCancelled(Runnable action) {
+ CompletionException err = assertThrows(CompletionException.class,
action::run);
+ SqlException sqlErr = assertInstanceOf(SqlException.class,
err.getCause());
+ assertEquals(Sql.EXECUTION_CANCELLED_ERR, sqlErr.code(),
sqlErr.toString());
+ }
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-23637 remove this
method and use expectQueryCancelled instead.
+ private static void
expectQueryCancelledWithQueryCancelledException(Runnable action) {
+ CompletionException err = assertThrows(CompletionException.class,
action::run);
+ assertInstanceOf(QueryCancelledException.class, err.getCause());
+ }
+
+ private static class DrainCursor implements Runnable {
+ final AsyncSqlCursor<?> cursor;
+
+ DrainCursor(AsyncSqlCursor<?> cursor) {
+ this.cursor = cursor;
+ }
+
+ @Override
+ public void run() {
+ BatchedResult<?> batch;
+ do {
+ batch = cursor.requestNextAsync(1).join();
+ } while (!batch.items().isEmpty());
+ }
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
index 5a2da1d991..fec0ee6785 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.lang.TraceableException;
import org.apache.ignite.sql.BatchedArguments;
import org.apache.ignite.sql.IgniteSql;
@@ -201,11 +202,16 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent {
/** {@inheritDoc} */
@Override
- public ResultSet<SqlRow> execute(@Nullable Transaction transaction, String
query, @Nullable Object... arguments) {
+ public ResultSet<SqlRow> execute(
+ @Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
+ String query,
+ @Nullable Object... arguments
+ ) {
Objects.requireNonNull(query);
try {
- return new SyncResultSetAdapter<>(executeAsync(transaction, query,
arguments).join());
+ return new SyncResultSetAdapter<>(executeAsync(transaction,
cancellationToken, query, arguments).join());
} catch (CompletionException e) {
throw
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
}
@@ -213,11 +219,16 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent {
/** {@inheritDoc} */
@Override
- public ResultSet<SqlRow> execute(@Nullable Transaction transaction,
Statement statement, @Nullable Object... arguments) {
+ public ResultSet<SqlRow> execute(
+ @Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
+ Statement statement,
+ @Nullable Object... arguments
+ ) {
Objects.requireNonNull(statement);
try {
- return new SyncResultSetAdapter<>(executeAsync(transaction,
statement, arguments).join());
+ return new SyncResultSetAdapter<>(executeAsync(transaction,
cancellationToken, statement, arguments).join());
} catch (CompletionException e) {
throw
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
}
@@ -225,15 +236,16 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent {
/** {@inheritDoc} */
@Override
- public <T> ResultSet<T> execute(
+ public <T> ResultSet<T> execute(
@Nullable Transaction transaction,
@Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
String query,
@Nullable Object... arguments) {
Objects.requireNonNull(query);
try {
- return new SyncResultSetAdapter<>(executeAsync(transaction,
mapper, query, arguments).join());
+ return new SyncResultSetAdapter<>(executeAsync(transaction,
mapper, cancellationToken, query, arguments).join());
} catch (CompletionException e) {
throw
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
}
@@ -241,9 +253,10 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent {
/** {@inheritDoc} */
@Override
- public <T> ResultSet<T> execute(
+ public <T> ResultSet<T> execute(
@Nullable Transaction transaction,
@Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
Statement statement,
@Nullable Object... arguments) {
Objects.requireNonNull(statement);
@@ -291,26 +304,32 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent {
@Override
public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
@Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
String query,
@Nullable Object... arguments
) {
- return executeAsyncInternal(transaction, createStatement(query),
arguments);
+ return executeAsyncInternal(transaction, cancellationToken,
createStatement(query), arguments);
}
/** {@inheritDoc} */
@Override
public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
@Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
Statement statement,
@Nullable Object... arguments
) {
- return executeAsyncInternal(transaction, statement, arguments);
+ return executeAsyncInternal(transaction, cancellationToken, statement,
arguments);
}
/** {@inheritDoc} */
@Override
- public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(@Nullable
Transaction transaction, @Nullable Mapper<T> mapper,
- String query, @Nullable Object... arguments) {
+ public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
+ String query, @Nullable Object... arguments
+ ) {
// TODO: IGNITE-18695.
throw new UnsupportedOperationException("Not implemented yet.");
}
@@ -320,14 +339,17 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent {
public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
@Nullable Transaction transaction,
@Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
Statement statement,
- @Nullable Object... arguments) {
+ @Nullable Object... arguments
+ ) {
// TODO: IGNITE-18695.
throw new UnsupportedOperationException("Not implemented yet.");
}
private CompletableFuture<AsyncResultSet<SqlRow>> executeAsyncInternal(
@Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
Statement statement,
@Nullable Object... arguments
) {
@@ -347,7 +369,12 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent {
.build();
result = queryProcessor.queryAsync(
- properties, observableTimestampTracker,
(InternalTransaction) transaction, statement.query(), arguments
+ properties,
+ observableTimestampTracker,
+ (InternalTransaction) transaction,
+ cancellationToken,
+ statement.query(),
+ arguments
).thenCompose(cur -> {
if (!busyLock.enterBusy()) {
cur.closeAsync();
@@ -459,7 +486,7 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent {
}
try {
- return queryProcessor.queryAsync(properties0,
observableTimestampTracker, transaction, query, args)
+ return queryProcessor.queryAsync(properties0,
observableTimestampTracker, transaction, null, query, args)
.thenCompose(cursor -> {
if (!enterBusy.get()) {
cursor.closeAsync();
@@ -578,8 +605,14 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent {
.set(QueryProperty.ALLOWED_QUERY_TYPES, SqlQueryType.ALL)
.build());
- CompletableFuture<AsyncSqlCursor<InternalSqlRow>> f =
- queryProcessor.queryAsync(properties0,
observableTimestampTracker, null, query, arguments);
+ CompletableFuture<AsyncSqlCursor<InternalSqlRow>> f =
queryProcessor.queryAsync(
+ properties0,
+ observableTimestampTracker,
+ null,
+ null,
+ query,
+ arguments
+ );
CompletableFuture<Void> resFut = new CompletableFuture<>();
ScriptHandler handler = new ScriptHandler(resFut, enterBusy,
leaveBusy);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/PublicApiThreadingIgniteSql.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/PublicApiThreadingIgniteSql.java
index 6d3a33c2b9..24e774c867 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/PublicApiThreadingIgniteSql.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/PublicApiThreadingIgniteSql.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Executor;
import java.util.function.Supplier;
import org.apache.ignite.internal.thread.PublicApiThreading;
import org.apache.ignite.internal.wrapper.Wrapper;
+import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.sql.BatchedArguments;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.ResultSet;
@@ -61,71 +62,87 @@ public class PublicApiThreadingIgniteSql implements
IgniteSql, Wrapper {
}
@Override
- public ResultSet<SqlRow> execute(@Nullable Transaction transaction, String
query, @Nullable Object... arguments) {
- return execUserSyncOperation(() -> sql.execute(transaction, query,
arguments));
+ public ResultSet<SqlRow> execute(
+ @Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
+ String query,
+ @Nullable Object... arguments
+ ) {
+ return execUserSyncOperation(() -> sql.execute(transaction,
cancellationToken, query, arguments));
}
@Override
- public ResultSet<SqlRow> execute(@Nullable Transaction transaction,
Statement statement, @Nullable Object... arguments) {
- return execUserSyncOperation(() -> sql.execute(transaction, statement,
arguments));
+ public ResultSet<SqlRow> execute(
+ @Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
+ Statement statement,
+ @Nullable Object... arguments
+ ) {
+ return execUserSyncOperation(() -> sql.execute(transaction,
cancellationToken, statement, arguments));
}
@Override
public <T> ResultSet<T> execute(
@Nullable Transaction transaction,
@Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
String query,
@Nullable Object... arguments
) {
- return execUserSyncOperation(() -> sql.execute(transaction, mapper,
query, arguments));
+ return execUserSyncOperation(() -> sql.execute(transaction, mapper,
cancellationToken, query, arguments));
}
@Override
public <T> ResultSet<T> execute(
@Nullable Transaction transaction,
@Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
Statement statement,
@Nullable Object... arguments
) {
- return execUserSyncOperation(() -> sql.execute(transaction, mapper,
statement, arguments));
+ return execUserSyncOperation(() -> sql.execute(transaction, mapper,
cancellationToken, statement, arguments));
}
@Override
public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
@Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
String query,
@Nullable Object... arguments
) {
- return doAsyncOperationForResultSet(() ->
sql.executeAsync(transaction, query, arguments));
+ return doAsyncOperationForResultSet(() ->
sql.executeAsync(transaction, cancellationToken, query, arguments));
}
@Override
public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
@Nullable Transaction transaction,
+ @Nullable CancellationToken cancellationToken,
Statement statement,
@Nullable Object... arguments
) {
- return doAsyncOperationForResultSet(() ->
sql.executeAsync(transaction, statement, arguments));
+ return doAsyncOperationForResultSet(() ->
sql.executeAsync(transaction, cancellationToken, statement, arguments));
}
@Override
public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
@Nullable Transaction transaction,
@Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
String query,
@Nullable Object... arguments
) {
- return doAsyncOperationForResultSet(() ->
sql.executeAsync(transaction, mapper, query, arguments));
+ return doAsyncOperationForResultSet(() ->
sql.executeAsync(transaction, mapper, cancellationToken, query, arguments));
}
@Override
public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
@Nullable Transaction transaction,
@Nullable Mapper<T> mapper,
+ @Nullable CancellationToken cancellationToken,
Statement statement,
@Nullable Object... arguments
) {
- return doAsyncOperationForResultSet(() ->
sql.executeAsync(transaction, mapper, statement, arguments));
+ return doAsyncOperationForResultSet(() ->
sql.executeAsync(transaction, mapper, cancellationToken, statement, arguments));
}
@Override
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryCancel.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryCancel.java
index 65ce14b2b0..1b1fb5ab66 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryCancel.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryCancel.java
@@ -88,14 +88,14 @@ public class QueryCancel {
}
/** Returns {@code true} if the cancellation procedure has already been
started. */
- public synchronized boolean isCancelled() {
+ public boolean isCancelled() {
return state.isDone();
}
private static void throwException(Reason reason) {
throw new QueryCancelledException(
reason == Reason.TIMEOUT
- ? QueryCancelledException.TIMEOUT_MSG
+ ? QueryCancelledException.TIMEOUT_MSG
: QueryCancelledException.CANCEL_MSG
);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
index 38471488c9..68b30ec977 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
@@ -23,6 +23,7 @@ import
org.apache.ignite.internal.sql.engine.prepare.QueryMetadata;
import org.apache.ignite.internal.sql.engine.property.SqlProperties;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.lang.IgniteException;
import org.jetbrains.annotations.Nullable;
@@ -32,21 +33,23 @@ import org.jetbrains.annotations.Nullable;
public interface QueryProcessor extends IgniteComponent {
/**
- * Returns columns and parameters metadata for the given statement.
- * This method uses optional array of parameters to assist with type
inference.
+ * Returns columns and parameters metadata for the given statement. This
method uses optional array of parameters to assist with type
+ * inference.
*
* @param properties User query properties. See {@link QueryProperty} for
available properties.
* @param transaction A transaction to use to resolve a schema.
* @param qry Single statement SQL query.
* @param params Query parameters.
* @return Query metadata.
- *
* @throws IgniteException in case of an error.
* @see QueryProperty
*/
- CompletableFuture<QueryMetadata> prepareSingleAsync(SqlProperties
properties,
+ CompletableFuture<QueryMetadata> prepareSingleAsync(
+ SqlProperties properties,
@Nullable InternalTransaction transaction,
- String qry, Object... params);
+ String qry,
+ Object... params
+ );
/**
* Execute the query with given schema name and parameters.
@@ -55,6 +58,7 @@ public interface QueryProcessor extends IgniteComponent {
* @param observableTime Tracker of the latest time observed by client.
* @param transaction A transaction to use for query execution. If null,
an implicit transaction
* will be started by provided transactions facade.
+ * @param cancellationToken Cancellation token or {@code null}.
* @param qry SQL query.
* @param params Query parameters.
* @return Sql cursor.
@@ -66,6 +70,7 @@ public interface QueryProcessor extends IgniteComponent {
SqlProperties properties,
HybridTimestampTracker observableTime,
@Nullable InternalTransaction transaction,
+ @Nullable CancellationToken cancellationToken,
String qry,
Object... params
);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 0c7f9047e8..3174d51c8c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -101,6 +101,7 @@ import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.sql.SqlException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -407,6 +408,7 @@ public class SqlQueryProcessor implements QueryProcessor,
SystemViewProvider {
SqlProperties properties,
HybridTimestampTracker observableTimeTracker,
@Nullable InternalTransaction transaction,
+ @Nullable CancellationToken cancellationToken,
String qry,
Object... params
) {
@@ -418,7 +420,13 @@ public class SqlQueryProcessor implements QueryProcessor,
SystemViewProvider {
QueryTransactionContext txContext = new
QueryTransactionContextImpl(txManager, observableTimeTracker, transaction,
txTracker);
- return queryExecutor.executeQuery(properties, txContext, qry,
params);
+ return queryExecutor.executeQuery(
+ properties,
+ txContext,
+ qry,
+ cancellationToken,
+ params
+ );
} finally {
busyLock.leaveBusy();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 229cd17c86..f582767d09 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringBuilder;
+import org.apache.ignite.internal.lang.SqlExceptionMapperUtil;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.TopologyEventHandler;
@@ -304,7 +305,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
assert cancelHandler != null;
- // This call triggers a timeout exception, if operation has timed out.
+ // This call immediately triggers a cancellation exception if
operation has timed out or it has already been cancelled.
cancelHandler.add(timeout -> {
QueryCompletionReason reason = timeout ?
QueryCompletionReason.TIMEOUT : QueryCompletionReason.CANCEL;
queryManager.close(reason);
@@ -463,11 +464,9 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
assert queryCancel != null;
queryCancel.add(timeout -> {
- if (!timeout) {
- return;
+ if (timeout) {
+ ret.completeExceptionally(new
QueryCancelledException(QueryCancelledException.TIMEOUT_MSG));
}
-
- ret.completeExceptionally(new
QueryCancelledException(QueryCancelledException.TIMEOUT_MSG));
});
return new IteratorToDataCursorAdapter<>(ret, Runnable::run);
@@ -976,6 +975,22 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
private AsyncCursor<InternalSqlRow> execute(InternalTransaction tx,
MultiStepPlan multiStepPlan) {
assert root != null;
+ // Query has already been cancelled, return immediately.
+ if (cancelled.get()) {
+ return new AsyncCursor<>() {
+ @Override
+ public CompletableFuture<BatchedResult<InternalSqlRow>>
requestNextAsync(int rows) {
+ Throwable t =
SqlExceptionMapperUtil.mapToPublicSqlException(new QueryCancelledException());
+ return CompletableFuture.failedFuture(t);
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ return DistributedQueryManager.this.cancelFut;
+ }
+ };
+ }
+
boolean mapOnBackups = tx.isReadOnly();
MappingParameters mappingParameters =
MappingParameters.create(ctx.parameters(), mapOnBackups);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
index 6154a57958..b7a6414a92 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
@@ -38,7 +38,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Represents a query initiated on current node.
- *
+ *
* <p>Encapsulates intermediate state populated throughout query lifecycle.
*/
class Query implements Runnable {
@@ -170,7 +170,7 @@ class Query implements Runnable {
return onPhaseStartedCallback.computeIfAbsent(phase, k -> new
CompletableFuture<>());
}
- /** Moves the query to a given state. */
+ /** Moves the query to a given state. */
void moveTo(ExecutionPhase newPhase) {
synchronized (mux) {
assert currentPhase.transitionAllowed(newPhase) : "currentPhase="
+ currentPhase + ", newPhase=" + newPhase;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
index c955914834..6386895cdb 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
@@ -55,6 +55,8 @@ import
org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
import org.apache.ignite.internal.sql.engine.util.cache.Cache;
import org.apache.ignite.internal.sql.engine.util.cache.CacheFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.CancelHandleHelper;
+import org.apache.ignite.lang.CancellationToken;
import org.jetbrains.annotations.Nullable;
/**
@@ -128,6 +130,7 @@ public class QueryExecutor implements LifecycleAware {
* @param properties User query properties. See {@link QueryProperty} for
available properties.
* @param txContext Transactional context to use.
* @param sql Query string.
+ * @param cancellationToken Cancellation token.
* @param params Query parameters.
* @return Future which will be completed with cursor.
*/
@@ -135,7 +138,8 @@ public class QueryExecutor implements LifecycleAware {
SqlProperties properties,
QueryTransactionContext txContext,
String sql,
- Object... params
+ @Nullable CancellationToken cancellationToken,
+ Object[] params
) {
SqlProperties properties0 = SqlPropertiesHelper.chain(properties,
defaultProperties);
@@ -155,7 +159,7 @@ public class QueryExecutor implements LifecycleAware {
}
try {
- trackQuery(query);
+ trackQuery(query, cancellationToken);
} finally {
busyLock.leaveBusy();
}
@@ -199,7 +203,7 @@ public class QueryExecutor implements LifecycleAware {
}
try {
- trackQuery(query);
+ trackQuery(query, null);
} finally {
busyLock.leaveBusy();
}
@@ -310,13 +314,19 @@ public class QueryExecutor implements LifecycleAware {
);
}
- private void trackQuery(Query query) {
+ private void trackQuery(Query query, @Nullable CancellationToken
cancellationToken) {
Query old = runningQueries.put(query.id, query);
assert old == null : "Query with the same id already registered";
- query.onPhaseStarted(ExecutionPhase.TERMINATED)
- .whenComplete((ignored, ex) ->
runningQueries.remove(query.id));
+ CompletableFuture<Void> queryTerminationFut =
query.onPhaseStarted(ExecutionPhase.TERMINATED);
+ CompletableFuture<Void> queryTerminationDoneFut =
queryTerminationFut.whenComplete((ignored, ex) -> {
+ runningQueries.remove(query.id);
+ });
+
+ if (cancellationToken != null) {
+ CancelHandleHelper.addCancelAction(cancellationToken,
query.cancel::cancel, queryTerminationDoneFut);
+ }
}
/** Returns list of queries registered on server at the moment. */
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
index 690c23d767..7259223588 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
@@ -231,8 +231,6 @@ public class PrepareServiceImpl implements PrepareService {
return CompletableFuture.failedFuture(new
SchemaNotFoundException(schemaName));
}
- // Add an action to trigger planner timeout, when operation times out.
- // Or trigger timeout immediately if operation has already timed out.
QueryCancel cancelHandler = operationContext.cancel();
assert cancelHandler != null;
boolean explicitTx = operationContext.txContext() != null &&
operationContext.txContext().explicitTx() != null;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/IgniteSqlImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/IgniteSqlImplTest.java
index 198bfedff5..3d7dc79a64 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/IgniteSqlImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/IgniteSqlImplTest.java
@@ -98,7 +98,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest {
when(result.onClose())
.thenReturn(closeFuture);
- when(queryProcessor.queryAsync(any(), any(), any(), any(),
any(Object[].class)))
+ when(queryProcessor.queryAsync(any(), any(), any(), any(), any(),
any(Object[].class)))
.thenReturn(completedFuture(result));
AsyncResultSet<?> rs = await(igniteSql.executeAsync(null, "SELECT 1"));
@@ -121,7 +121,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest {
when(result.onClose()).thenReturn(new CompletableFuture<>());
when(result.closeAsync()).thenReturn(nullCompletedFuture());
- when(queryProcessor.queryAsync(any(), any(), any(), any(),
any(Object[].class)))
+ when(queryProcessor.queryAsync(any(), any(), any(), any(), any(),
any(Object[].class)))
.thenReturn(completedFuture(result));
AsyncResultSet<?> rs = await(igniteSql.executeAsync(null, "SELECT 1"));
@@ -139,7 +139,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest {
void resultSetIsNotCreatedIfComponentIsStoppedInMiddleOfOperation() throws
Exception {
CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture = new
CompletableFuture<>();
CountDownLatch executeQueryLatch = new CountDownLatch(1);
- when(queryProcessor.queryAsync(any(), any(), any(), any(),
any(Object[].class)))
+ when(queryProcessor.queryAsync(any(), any(), any(), any(), any(),
any(Object[].class)))
.thenAnswer(ignored -> {
executeQueryLatch.countDown();
return cursorFuture;
@@ -175,7 +175,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest {
CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture = new
CompletableFuture<>();
CountDownLatch executeQueryLatch = new CountDownLatch(3);
- when(queryProcessor.queryAsync(any(), any(), any(), any(),
any(Object[].class)))
+ when(queryProcessor.queryAsync(any(), any(), any(), any(), any(),
any(Object[].class)))
.thenAnswer(ignored -> {
executeQueryLatch.countDown();
@@ -206,7 +206,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest {
() -> await(result)
);
assertThat(igniteSql.openedCursors(), empty());
- verify(queryProcessor, times(3)).queryAsync(any(), any(), any(),
any(), any(Object[].class));
+ verify(queryProcessor, times(3)).queryAsync(any(), any(), any(),
any(), any(), any(Object[].class));
verify(cursor).closeAsync();
}
@@ -232,7 +232,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest {
when(result.onClose())
.thenReturn(closeFuture);
- when(queryProcessor.queryAsync(any(), any(), any(), any(),
any(Object[].class)))
+ when(queryProcessor.queryAsync(any(), any(), any(), any(), any(),
any(Object[].class)))
.thenReturn(completedFuture(result));
AsyncResultSet<?> rs = await(igniteSql.executeAsync(null, "SELECT 1"));
@@ -264,7 +264,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest {
when(cursor2.hasNextResult()).thenReturn(false);
when(cursor2.closeAsync()).thenReturn(nullCompletedFuture());
- when(queryProcessor.queryAsync(any(), any(), any(), any(),
any(Object[].class)))
+ when(queryProcessor.queryAsync(any(), any(), any(), any(), any(),
any(Object[].class)))
.thenReturn(completedFuture(cursor1));
Void rs = await(igniteSql.executeScriptAsync("SELECT 1; SELECT 2"));
@@ -281,7 +281,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest {
when(cursor1.nextResult()).thenReturn(CompletableFuture.failedFuture(new
RuntimeException("Broken")));
when(cursor1.closeAsync()).thenReturn(nullCompletedFuture());
- when(queryProcessor.queryAsync(any(), any(), any(), any(),
any(Object[].class)))
+ when(queryProcessor.queryAsync(any(), any(), any(), any(), any(),
any(Object[].class)))
.thenReturn(completedFuture(cursor1));
assertThrowsSqlException(
@@ -310,7 +310,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest {
when(cursor2.nextResult()).thenReturn(CompletableFuture.failedFuture(lastCursorScriptException));
when(cursor2.closeAsync()).thenReturn(CompletableFuture.failedFuture(cursorCloseException2));
- when(queryProcessor.queryAsync(any(), any(), any(), any(),
any(Object[].class)))
+ when(queryProcessor.queryAsync(any(), any(), any(), any(), any(),
any(Object[].class)))
.thenReturn(completedFuture(cursor1));
SqlException sqlEx = assertThrowsExactly(SqlException.class, () ->
await(igniteSql.executeScriptAsync("SELECT 1; SELECT 2")));
@@ -336,7 +336,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest {
when(cursor2.closeAsync()).thenReturn(nullCompletedFuture());
- when(queryProcessor.queryAsync(any(), any(), any(), any(),
any(Object[].class)))
+ when(queryProcessor.queryAsync(any(), any(), any(), any(), any(),
any(Object[].class)))
.thenReturn(completedFuture(cursor1));
Thread thread = new Thread(() -> {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
index 6d213cbee0..c26427b329 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java
@@ -45,6 +45,7 @@ import
org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.lang.CancellationToken;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -120,8 +121,12 @@ public class TransactionEnlistTest extends
BaseIgniteAbstractTest {
}
@Override
- public CompletableFuture<QueryMetadata>
prepareSingleAsync(SqlProperties properties,
- @Nullable InternalTransaction transaction, String qry,
Object... params) {
+ public CompletableFuture<QueryMetadata> prepareSingleAsync(
+ SqlProperties properties,
+ @Nullable InternalTransaction transaction,
+ String qry,
+ Object... params
+ ) {
assert params == null || params.length == 0 : "params are not
supported";
assert prepareOnly : "Expected that the query will be executed";
@@ -135,6 +140,7 @@ public class TransactionEnlistTest extends
BaseIgniteAbstractTest {
SqlProperties properties,
HybridTimestampTracker observableTimeTracker,
@Nullable InternalTransaction transaction,
+ @Nullable CancellationToken cancellationToken,
String qry,
Object... params
) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 0eb50b9a33..61abdcbd3b 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -88,6 +88,7 @@ import
org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
import
org.apache.ignite.internal.partitiondistribution.TokenizedAssignmentsImpl;
import org.apache.ignite.internal.sql.SqlCommon;
+import org.apache.ignite.internal.sql.engine.QueryCancel;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import org.apache.ignite.internal.sql.engine.exec.ExecutableTable;
import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
@@ -511,6 +512,9 @@ public class TestBuilders {
/** Sets the dynamic parameters this fragment will be executed with. */
ExecutionContextBuilder dynamicParameters(Object... params);
+ /** Sets the query cancellation procedure. */
+ ExecutionContextBuilder queryCancel(QueryCancel queryCancel);
+
/**
* Builds the context object.
*
@@ -526,6 +530,7 @@ public class TestBuilders {
private QueryTaskExecutor executor = null;
private ClusterNode node = null;
private Object[] dynamicParams = ArrayUtils.OBJECT_EMPTY_ARRAY;
+ private QueryCancel queryCancel = null;
/** {@inheritDoc} */
@Override
@@ -559,12 +564,20 @@ public class TestBuilders {
return this;
}
+ /** {@inheritDoc} */
@Override
public ExecutionContextBuilder dynamicParameters(Object... params) {
this.dynamicParams = params;
return this;
}
+ /** {@inheritDoc} */
+ @Override
+ public ExecutionContextBuilder queryCancel(QueryCancel queryCancel) {
+ this.queryCancel = queryCancel;
+ return this;
+ }
+
/** {@inheritDoc} */
@Override
public ExecutionContext<Object[]> build() {
@@ -578,7 +591,7 @@ public class TestBuilders {
Commons.parametersMap(dynamicParams),
TxAttributes.fromTx(new NoOpTransaction(node.name())),
SqlQueryProcessor.DEFAULT_TIME_ZONE_ID,
- null
+ queryCancel
);
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
index 7cbdc2ab80..75d4a8ddbd 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
@@ -43,6 +43,7 @@ import
org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.sql.ColumnMetadata;
import org.apache.ignite.sql.ColumnType;
import org.jetbrains.annotations.Nullable;
@@ -304,8 +305,12 @@ public class QueryCheckerTest extends
BaseIgniteAbstractTest {
}
@Override
- public CompletableFuture<QueryMetadata>
prepareSingleAsync(SqlProperties properties,
- @Nullable InternalTransaction transaction, String qry,
Object... params) {
+ public CompletableFuture<QueryMetadata> prepareSingleAsync(
+ SqlProperties properties,
+ @Nullable InternalTransaction transaction,
+ String qry,
+ Object... params
+ ) {
assert params == null || params.length == 0 : "params are not
supported";
assert prepareOnly : "Expected that the query will be executed";
@@ -319,6 +324,7 @@ public class QueryCheckerTest extends
BaseIgniteAbstractTest {
SqlProperties properties,
HybridTimestampTracker observableTimeTracker,
@Nullable InternalTransaction transaction,
+ @Nullable CancellationToken cancellationToken,
String qry,
Object... params
) {
diff --git
a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerImpl.java
b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerImpl.java
index 56dd52f50b..2c7ec723e2 100644
---
a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerImpl.java
+++
b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerImpl.java
@@ -327,7 +327,13 @@ abstract class QueryCheckerImpl implements QueryChecker {
if (!CollectionUtils.nullOrEmpty(planMatchers)) {
CompletableFuture<AsyncSqlCursor<InternalSqlRow>> explainCursors =
qryProc.queryAsync(
- properties, observableTimeTracker(), tx, "EXPLAIN PLAN FOR
" + qry, params);
+ properties,
+ observableTimeTracker(),
+ tx,
+ null,
+ "EXPLAIN PLAN FOR " + qry,
+ params
+ );
AsyncSqlCursor<InternalSqlRow> explainCursor =
await(explainCursors);
List<InternalSqlRow> explainRes = getAllFromCursor(explainCursor);
@@ -353,7 +359,7 @@ abstract class QueryCheckerImpl implements QueryChecker {
// Check result.
CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursors =
- bypassingThreadAssertionsAsync(() ->
qryProc.queryAsync(properties, observableTimeTracker(), tx, qry, params));
+ bypassingThreadAssertionsAsync(() ->
qryProc.queryAsync(properties, observableTimeTracker(), tx, null, qry, params));
AsyncSqlCursor<InternalSqlRow> cur = await(cursors);