This is an automated email from the ASF dual-hosted git repository. korlov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new ec70aea639 IGNITE-23430: Sql. Provide an ability to cancel query before first page ready (#4615) ec70aea639 is described below commit ec70aea639c184dd883d9fc09d050514764d93fe Author: Max Zhuravkov <shh...@gmail.com> AuthorDate: Mon Nov 11 08:22:46 2024 +0200 IGNITE-23430: Sql. Provide an ability to cancel query before first page ready (#4615) --- .../java/org/apache/ignite/lang/CancelHandle.java | 62 +++++ .../org/apache/ignite/lang/CancelHandleImpl.java | 163 ++++++++++++ .../org/apache/ignite/lang/CancellationToken.java} | 13 +- .../main/java/org/apache/ignite/sql/IgniteSql.java | 189 ++++++++++++- .../client/handler/JdbcQueryEventHandlerImpl.java | 2 + .../requests/sql/ClientSqlExecuteRequest.java | 9 +- .../handler/JdbcQueryEventHandlerImplTest.java | 4 +- .../ignite/internal/client/sql/ClientSql.java | 41 ++- .../client/fakes/FakeIgniteQueryProcessor.java | 10 +- .../apache/ignite/internal/util/Cancellable.java | 1 + .../org/apache/ignite/lang/CancelHandleHelper.java | 71 +++++ .../ignite/lang/CancelHandleHelperSelfTest.java | 292 +++++++++++++++++++++ .../benchmark/AbstractMultiNodeBenchmark.java | 4 +- .../ignite/internal/benchmark/SelectBenchmark.java | 4 +- .../benchmark/SqlMultiStatementBenchmark.java | 4 +- .../internal/restart/RestartProofIgniteSql.java | 76 +++++- .../internal/sql/api/ItSqlAsynchronousApiTest.java | 98 +++++++ .../sql/api/ItSqlClientAsynchronousApiTest.java | 13 + .../sql/api/ItSqlClientSynchronousApiTest.java | 13 + .../internal/sql/api/ItSqlSynchronousApiTest.java | 88 +++++++ .../sql/engine/BaseSqlMultiStatementTest.java | 2 +- .../internal/sql/engine/ItQueryCancelTest.java | 255 ++++++++++++++++++ .../ignite/internal/sql/api/IgniteSqlImpl.java | 65 +++-- .../sql/api/PublicApiThreadingIgniteSql.java | 37 ++- .../ignite/internal/sql/engine/QueryCancel.java | 4 +- .../ignite/internal/sql/engine/QueryProcessor.java | 15 +- .../internal/sql/engine/SqlQueryProcessor.java | 10 +- .../sql/engine/exec/ExecutionServiceImpl.java | 25 +- .../ignite/internal/sql/engine/exec/fsm/Query.java | 4 +- .../sql/engine/exec/fsm/QueryExecutor.java | 22 +- .../sql/engine/prepare/PrepareServiceImpl.java | 2 - .../ignite/internal/sql/api/IgniteSqlImplTest.java | 20 +- .../sql/engine/exec/TransactionEnlistTest.java | 10 +- .../sql/engine/framework/TestBuilders.java | 15 +- .../internal/sql/engine/util/QueryCheckerTest.java | 10 +- .../internal/sql/engine/util/QueryCheckerImpl.java | 10 +- 36 files changed, 1546 insertions(+), 117 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/lang/CancelHandle.java b/modules/api/src/main/java/org/apache/ignite/lang/CancelHandle.java new file mode 100644 index 0000000000..1f81df251b --- /dev/null +++ b/modules/api/src/main/java/org/apache/ignite/lang/CancelHandle.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.lang; + +import java.util.concurrent.CompletableFuture; + +/** + * A handle which may be used to request the cancellation of execution. + */ +public interface CancelHandle { + + /** A factory method to create a handle. */ + static CancelHandle create() { + return new CancelHandleImpl(); + } + + /** + * Abruptly terminates an execution of an associated process. + * + * <p>Control flow will return after the process has been terminated and the resources associated with that process have been freed. + */ + void cancel(); + + /** + * Abruptly terminates an execution of a associated process. + * + * @return A future that will be completed after the process has been terminated and the resources associated with that process have + * been freed. + */ + CompletableFuture<Void> cancelAsync(); + + /** + * Flag indicating whether cancellation was requested or not. + * + * <p>This method will return true even if cancellation has not been completed yet. + * + * @return {@code true} when cancellation was requested. + */ + boolean isCancelled(); + + /** + * Issue a token associated with this handle. + * + * <p>Token is reusable, meaning the same token may be used to link several executions into a single cancellable. + */ + CancellationToken token(); +} diff --git a/modules/api/src/main/java/org/apache/ignite/lang/CancelHandleImpl.java b/modules/api/src/main/java/org/apache/ignite/lang/CancelHandleImpl.java new file mode 100644 index 0000000000..a28c97e3d0 --- /dev/null +++ b/modules/api/src/main/java/org/apache/ignite/lang/CancelHandleImpl.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.lang; + +import java.util.ArrayDeque; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.lang.ErrorGroups.Common; + +/** Implementation of {@link CancelHandle}. */ +final class CancelHandleImpl implements CancelHandle { + + private final CompletableFuture<Void> cancelFut = new CompletableFuture<>(); + + private final CancellationTokenImpl token; + + CancelHandleImpl() { + this.token = new CancellationTokenImpl(this); + } + + /** {@inheritDoc} */ + @Override + public void cancel() { + doCancelAsync(); + + cancelFut.join(); + } + + /** {@inheritDoc} */ + @Override + public CompletableFuture<Void> cancelAsync() { + doCancelAsync(); + + // Make a copy of internal future, so that it is not possible to complete it + return cancelFut.copy(); + } + + /** {@inheritDoc} */ + @Override + public boolean isCancelled() { + return token.isCancelled(); + } + + /** {@inheritDoc} */ + @Override + public CancellationToken token() { + return token; + } + + private void doCancelAsync() { + token.cancel(); + } + + static final class CancellationTokenImpl implements CancellationToken { + + private final ArrayDeque<Cancellation> cancellations = new ArrayDeque<>(); + + private final CancelHandleImpl handle; + + private final Object mux = new Object(); + + private volatile CompletableFuture<Void> cancelFut; + + CancellationTokenImpl(CancelHandleImpl handle) { + this.handle = handle; + } + + void addCancelAction(Runnable action, CompletableFuture<?> fut) { + Cancellation cancellation = new Cancellation(action, fut); + + if (cancelFut != null) { + cancellation.run(); + } else { + synchronized (mux) { + if (cancelFut == null) { + cancellations.add(cancellation); + return; + } + } + + cancellation.run(); + } + } + + boolean isCancelled() { + return cancelFut != null; + } + + @SuppressWarnings("rawtypes") + void cancel() { + if (cancelFut != null) { + return; + } + + synchronized (mux) { + if (cancelFut != null) { + return; + } + + // First assemble all completion futures + CompletableFuture[] futures = cancellations.stream() + .map(c -> c.completionFut) + .toArray(CompletableFuture[]::new); + + // handle.cancelFut completes when all cancellation futures complete. + cancelFut = CompletableFuture.allOf(futures).whenComplete((r, t) -> { + handle.cancelFut.complete(null); + }); + } + + IgniteException error = null; + + // Run cancellation actions outside of lock + for (Cancellation cancellation : cancellations) { + try { + cancellation.run(); + } catch (Throwable t) { + if (error == null) { + error = new IgniteException(Common.INTERNAL_ERR, "Failed to cancel an operation"); + } + error.addSuppressed(t); + } + } + + if (error != null) { + throw error; + } + } + } + + /** + * Stores an action that triggers a cancellation and a completable future that completes when a resource is closed. + */ + private static class Cancellation { + + private final Runnable cancelAction; + + private final CompletableFuture<?> completionFut; + + private Cancellation(Runnable cancelAction, CompletableFuture<?> completionFut) { + this.cancelAction = cancelAction; + this.completionFut = completionFut; + } + + private void run() { + cancelAction.run(); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/Cancellable.java b/modules/api/src/main/java/org/apache/ignite/lang/CancellationToken.java similarity index 68% copy from modules/core/src/main/java/org/apache/ignite/internal/util/Cancellable.java copy to modules/api/src/main/java/org/apache/ignite/lang/CancellationToken.java index 5f6cd721e1..52857cd317 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/Cancellable.java +++ b/modules/api/src/main/java/org/apache/ignite/lang/CancellationToken.java @@ -15,16 +15,11 @@ * limitations under the License. */ -package org.apache.ignite.internal.util; +package org.apache.ignite.lang; /** - * A {@code Cancellable} represents a process or an operation that can be canceled. + * Cancellation token is an object that is issued by {@link CancelHandle} and can be used by an operation or a resource to observe a signal + * to terminate it. */ -public interface Cancellable { - /** - * Cancels the ongoing operation or process or do nothing if it has been already cancelled. - * - * @param timeout If process was cancelled due to timeout. - */ - void cancel(boolean timeout); +public interface CancellationToken { } diff --git a/modules/api/src/main/java/org/apache/ignite/sql/IgniteSql.java b/modules/api/src/main/java/org/apache/ignite/sql/IgniteSql.java index d2beea304d..fecd4bfca1 100644 --- a/modules/api/src/main/java/org/apache/ignite/sql/IgniteSql.java +++ b/modules/api/src/main/java/org/apache/ignite/sql/IgniteSql.java @@ -18,6 +18,7 @@ package org.apache.ignite.sql; import java.util.concurrent.CompletableFuture; +import org.apache.ignite.lang.CancellationToken; import org.apache.ignite.sql.async.AsyncResultSet; import org.apache.ignite.table.mapper.Mapper; import org.apache.ignite.tx.Transaction; @@ -52,7 +53,26 @@ public interface IgniteSql { * @return SQL query result set. * @throws SqlException If failed. */ - ResultSet<SqlRow> execute(@Nullable Transaction transaction, String query, @Nullable Object... arguments); + default ResultSet<SqlRow> execute(@Nullable Transaction transaction, String query, @Nullable Object... arguments) { + return execute(transaction, (CancellationToken) null, query, arguments); + } + + /** + * Executes a single SQL query. + * + * @param transaction Transaction to execute the query within or {@code null}. + * @param cancellationToken Cancellation token or {@code null}. + * @param query SQL query template. + * @param arguments Arguments for the template (optional). + * @return SQL query result set. + * @throws SqlException If failed. + */ + ResultSet<SqlRow> execute( + @Nullable Transaction transaction, + @Nullable CancellationToken cancellationToken, + String query, + @Nullable Object... arguments + ); /** * Executes a single SQL statement. @@ -62,12 +82,54 @@ public interface IgniteSql { * @param arguments Arguments for the statement. * @return SQL query result set. */ - ResultSet<SqlRow> execute(@Nullable Transaction transaction, Statement statement, @Nullable Object... arguments); + default ResultSet<SqlRow> execute( + @Nullable Transaction transaction, + Statement statement, + @Nullable Object... arguments + ) { + return execute(transaction, (CancellationToken) null, statement, arguments); + } + + /** + * Executes a single SQL statement. + * + * @param transaction Transaction to execute the statement within or {@code null}. + * @param cancellationToken Cancellation token or {@code null}. + * @param statement SQL statement to execute. + * @param arguments Arguments for the statement. + * @return SQL query result set. + */ + ResultSet<SqlRow> execute( + @Nullable Transaction transaction, + @Nullable CancellationToken cancellationToken, + Statement statement, + @Nullable Object... arguments + ); + + /** + * Executes single SQL statement and maps results to objects with the provided mapper. + * + * @param transaction Transaction to execute the statement within or {@code null}. + * @param mapper Mapper that defines the row type and the way to map columns to the type members. See {@link Mapper#of}. + * @param query SQL query template. + * @param arguments Arguments for the statement. + * @param <T> A type of object contained in result set. + * @return SQL query results set. + */ + default <T> ResultSet<T> execute( + @Nullable Transaction transaction, + @Nullable Mapper<T> mapper, + String query, + @Nullable Object... arguments + ) { + return execute(transaction, mapper, null, query, arguments); + } /** * Executes single SQL statement and maps results to objects with the provided mapper. * * @param transaction Transaction to execute the statement within or {@code null}. + * @param cancellationToken Cancellation token or {@code null}. * @param mapper Mapper that defines the row type and the way to map columns to the type members. See {@link Mapper#of}. * @param query SQL query template. * @param arguments Arguments for the statement. @@ -77,8 +139,10 @@ public interface IgniteSql { <T> ResultSet<T> execute( @Nullable Transaction transaction, @Nullable Mapper<T> mapper, + @Nullable CancellationToken cancellationToken, String query, - @Nullable Object... arguments); + @Nullable Object... arguments + ); /** * Executes single SQL statement and maps results to objects with the provided mapper. @@ -90,27 +154,128 @@ public interface IgniteSql { * @param <T> A type of object contained in result set. * @return SQL query results set. */ + default <T> ResultSet<T> execute( + @Nullable Transaction transaction, + @Nullable Mapper<T> mapper, + Statement statement, + @Nullable Object... arguments + ) { + return execute(transaction, mapper, null, statement, arguments); + } + + /** + * Executes single SQL statement and maps results to objects with the provided mapper. + * + * @param transaction Transaction to execute the statement within or {@code null}. + * @param cancellationToken Cancellation token or {@code null}. + * @param mapper Mapper that defines the row type and the way to map columns to the type members. See {@link Mapper#of}. + * @param statement SQL statement to execute. + * @param arguments Arguments for the statement. + * @param <T> A type of object contained in result set. + * @return SQL query results set. + */ <T> ResultSet<T> execute( @Nullable Transaction transaction, @Nullable Mapper<T> mapper, + @Nullable CancellationToken cancellationToken, Statement statement, - @Nullable Object... arguments); + @Nullable Object... arguments + ); + + /** + * Executes SQL query in an asynchronous way. + * + * @param transaction Transaction to execute the query within or {@code null}. + * @param query SQL query template. + * @param arguments Arguments for the template (optional). + * @return Operation future. + * @throws SqlException If failed. + */ + default CompletableFuture<AsyncResultSet<SqlRow>> executeAsync( + @Nullable Transaction transaction, + String query, + @Nullable Object... arguments + ) { + return executeAsync(transaction, (CancellationToken) null, query, arguments); + } /** * Executes SQL query in an asynchronous way. * * @param transaction Transaction to execute the query within or {@code null}. + * @param cancellationToken Cancellation token or {@code null}. * @param query SQL query template. * @param arguments Arguments for the template (optional). * @return Operation future. * @throws SqlException If failed. */ - CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(@Nullable Transaction transaction, String query, @Nullable Object... arguments); + CompletableFuture<AsyncResultSet<SqlRow>> executeAsync( + @Nullable Transaction transaction, + @Nullable CancellationToken cancellationToken, + String query, + @Nullable Object... arguments + ); + + /** + * Executes an SQL statement asynchronously. + * + * @param transaction Transaction to execute the statement within or {@code null}. + * @param statement SQL statement to execute. + * @param arguments Arguments for the statement. + * @return Operation future. + * @throws SqlException If failed. + */ + default CompletableFuture<AsyncResultSet<SqlRow>> executeAsync( + @Nullable Transaction transaction, + Statement statement, + @Nullable Object... arguments + ) { + return executeAsync(transaction, (CancellationToken) null, statement, arguments); + } + + /** + * Executes SQL statement in an asynchronous way and maps results to objects with the provided mapper. + * + * @param transaction Transaction to execute the statement within or {@code null}. + * @param mapper Mapper that defines the row type and the way to map columns to the type members. See {@link Mapper#of}. + * @param query SQL query template. + * @param arguments Arguments for the statement. + * @param <T> A type of object contained in result set. + * @return Operation future. + */ + default <T> CompletableFuture<AsyncResultSet<T>> executeAsync( + @Nullable Transaction transaction, + @Nullable Mapper<T> mapper, + String query, + @Nullable Object... arguments + ) { + return executeAsync(transaction, mapper, null, query, arguments); + } + + /** + * Executes SQL statement in an asynchronous way and maps results to objects with the provided mapper. + * + * @param transaction Transaction to execute the statement within or {@code null}. + * @param mapper Mapper that defines the row type and the way to map columns to the type members. See {@link Mapper#of}. + * @param statement SQL statement to execute. + * @param arguments Arguments for the statement. + * @param <T> A type of object contained in result set. + * @return Operation future. + */ + default <T> CompletableFuture<AsyncResultSet<T>> executeAsync( + @Nullable Transaction transaction, + @Nullable Mapper<T> mapper, + Statement statement, + @Nullable Object... arguments + ) { + return executeAsync(transaction, mapper, null, statement, arguments); + } /** * Executes an SQL statement asynchronously. * * @param transaction Transaction to execute the statement within or {@code null}. + * @param cancellationToken Cancellation token or {@code null}. * @param statement SQL statement to execute. * @param arguments Arguments for the statement. * @return Operation future. @@ -118,13 +283,16 @@ public interface IgniteSql { */ CompletableFuture<AsyncResultSet<SqlRow>> executeAsync( @Nullable Transaction transaction, + @Nullable CancellationToken cancellationToken, Statement statement, - @Nullable Object... arguments); + @Nullable Object... arguments + ); /** * Executes SQL statement in an asynchronous way and maps results to objects with the provided mapper. * * @param transaction Transaction to execute the statement within or {@code null}. + * @param cancellationToken Cancellation token or {@code null}. * @param mapper Mapper that defines the row type and the way to map columns to the type members. See {@link Mapper#of}. * @param query SQL query template. * @param arguments Arguments for the statement. @@ -134,13 +302,16 @@ public interface IgniteSql { <T> CompletableFuture<AsyncResultSet<T>> executeAsync( @Nullable Transaction transaction, @Nullable Mapper<T> mapper, + @Nullable CancellationToken cancellationToken, String query, - @Nullable Object... arguments); + @Nullable Object... arguments + ); /** * Executes SQL statement in an asynchronous way and maps results to objects with the provided mapper. * * @param transaction Transaction to execute the statement within or {@code null}. + * @param cancellationToken Cancellation token or {@code null}. * @param mapper Mapper that defines the row type and the way to map columns to the type members. See {@link Mapper#of}. * @param statement SQL statement to execute. * @param arguments Arguments for the statement. @@ -150,8 +321,10 @@ public interface IgniteSql { <T> CompletableFuture<AsyncResultSet<T>> executeAsync( @Nullable Transaction transaction, @Nullable Mapper<T> mapper, + @Nullable CancellationToken cancellationToken, Statement statement, - @Nullable Object... arguments); + @Nullable Object... arguments + ); /** * Executes a batched SQL query. Only DML queries are supported. diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java index a73bad6915..75a005a7c7 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java @@ -154,6 +154,7 @@ public class JdbcQueryEventHandlerImpl extends JdbcHandlerBase implements JdbcQu properties, igniteTransactions.observableTimestampTracker(), tx, + null, req.sqlQuery(), req.arguments() == null ? OBJECT_EMPTY_ARRAY : req.arguments() ); @@ -280,6 +281,7 @@ public class JdbcQueryEventHandlerImpl extends JdbcHandlerBase implements JdbcQu properties, igniteTransactions.observableTimestampTracker(), tx, + null, sql, arg == null ? OBJECT_EMPTY_ARRAY : arg ); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java index c3fa0795fe..1cae217b90 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java @@ -167,8 +167,13 @@ public class ClientSqlExecuteRequest { .build(); CompletableFuture<AsyncResultSet<SqlRow>> fut = qryProc.queryAsync( - properties, transactions.observableTimestampTracker(), (InternalTransaction) transaction, query, arguments) - .thenCompose(cur -> cur.requestNextAsync(pageSize) + properties, + transactions.observableTimestampTracker(), + (InternalTransaction) transaction, + null, + query, + arguments + ).thenCompose(cur -> cur.requestNextAsync(pageSize) .thenApply( batchRes -> new AsyncResultSetImpl<>( cur, diff --git a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java index d26a857dee..7ece7f2ccb 100644 --- a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java +++ b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImplTest.java @@ -162,7 +162,7 @@ class JdbcQueryEventHandlerImplTest extends BaseIgniteAbstractTest { @Test public void singleTxUsedForMultipleOperations() { - when(queryProcessor.queryAsync(any(), any(), any(), any(), any(Object[].class))) + when(queryProcessor.queryAsync(any(), any(), any(), any(), any(), any(Object[].class))) .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Expected"))); InternalTransaction tx = mock(InternalTransaction.class); @@ -196,7 +196,7 @@ class JdbcQueryEventHandlerImplTest extends BaseIgniteAbstractTest { verify(igniteTransactions, times(5)).observableTimestampTracker(); verifyNoMoreInteractions(igniteTransactions); - verify(queryProcessor, times(5)).queryAsync(any(), any(), any(), any(), any(Object[].class)); + verify(queryProcessor, times(5)).queryAsync(any(), any(), any(), any(), any(), any(Object[].class)); } private long acquireConnectionId() { diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java index 437b945c4d..60e5175a13 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.sql.StatementBuilderImpl; import org.apache.ignite.internal.sql.StatementImpl; import org.apache.ignite.internal.sql.SyncResultSetAdapter; import org.apache.ignite.internal.util.ExceptionUtils; +import org.apache.ignite.lang.CancellationToken; import org.apache.ignite.sql.BatchedArguments; import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.sql.ResultSet; @@ -91,11 +92,16 @@ public class ClientSql implements IgniteSql { /** {@inheritDoc} */ @Override - public ResultSet<SqlRow> execute(@Nullable Transaction transaction, String query, @Nullable Object... arguments) { + public ResultSet<SqlRow> execute( + @Nullable Transaction transaction, + @Nullable CancellationToken cancellationToken, + String query, + @Nullable Object... arguments + ) { Objects.requireNonNull(query); try { - return new SyncResultSetAdapter<>(executeAsync(transaction, query, arguments).join()); + return new SyncResultSetAdapter<>(executeAsync(transaction, cancellationToken, query, arguments).join()); } catch (CompletionException e) { throw ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e)); } @@ -103,11 +109,16 @@ public class ClientSql implements IgniteSql { /** {@inheritDoc} */ @Override - public ResultSet<SqlRow> execute(@Nullable Transaction transaction, Statement statement, @Nullable Object... arguments) { + public ResultSet<SqlRow> execute( + @Nullable Transaction transaction, + @Nullable CancellationToken cancellationToken, + Statement statement, + @Nullable Object... arguments + ) { Objects.requireNonNull(statement); try { - return new SyncResultSetAdapter<>(executeAsync(transaction, statement, arguments).join()); + return new SyncResultSetAdapter<>(executeAsync(transaction, cancellationToken, statement, arguments).join()); } catch (CompletionException e) { throw ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e)); } @@ -118,12 +129,14 @@ public class ClientSql implements IgniteSql { public <T> ResultSet<T> execute( @Nullable Transaction transaction, @Nullable Mapper<T> mapper, + @Nullable CancellationToken cancellationToken, String query, - @Nullable Object... arguments) { + @Nullable Object... arguments + ) { Objects.requireNonNull(query); try { - return new SyncResultSetAdapter<>(executeAsync(transaction, mapper, query, arguments).join()); + return new SyncResultSetAdapter<>(executeAsync(transaction, mapper, cancellationToken, query, arguments).join()); } catch (CompletionException e) { throw ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e)); } @@ -134,12 +147,14 @@ public class ClientSql implements IgniteSql { public <T> ResultSet<T> execute( @Nullable Transaction transaction, @Nullable Mapper<T> mapper, + @Nullable CancellationToken cancellationToken, Statement statement, - @Nullable Object... arguments) { + @Nullable Object... arguments + ) { Objects.requireNonNull(statement); try { - return new SyncResultSetAdapter<>(executeAsync(transaction, mapper, statement, arguments).join()); + return new SyncResultSetAdapter<>(executeAsync(transaction, mapper, cancellationToken, statement, arguments).join()); } catch (CompletionException e) { throw ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e)); } @@ -177,22 +192,24 @@ public class ClientSql implements IgniteSql { @Override public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync( @Nullable Transaction transaction, + @Nullable CancellationToken cancellationToken, String query, @Nullable Object... arguments) { Objects.requireNonNull(query); StatementImpl statement = new StatementImpl(query); - return executeAsync(transaction, statement, arguments); + return executeAsync(transaction, cancellationToken, statement, arguments); } /** {@inheritDoc} */ @Override public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync( @Nullable Transaction transaction, + @Nullable CancellationToken cancellationToken, Statement statement, @Nullable Object... arguments) { - return executeAsync(transaction, sqlRowMapper, statement, arguments); + return executeAsync(transaction, sqlRowMapper, cancellationToken, statement, arguments); } /** {@inheritDoc} */ @@ -200,13 +217,14 @@ public class ClientSql implements IgniteSql { public <T> CompletableFuture<AsyncResultSet<T>> executeAsync( @Nullable Transaction transaction, @Nullable Mapper<T> mapper, + @Nullable CancellationToken cancellationToken, String query, @Nullable Object... arguments) { Objects.requireNonNull(query); StatementImpl statement = new StatementImpl(query); - return executeAsync(transaction, mapper, statement, arguments); + return executeAsync(transaction, mapper, cancellationToken, statement, arguments); } /** {@inheritDoc} */ @@ -214,6 +232,7 @@ public class ClientSql implements IgniteSql { public <T> CompletableFuture<AsyncResultSet<T>> executeAsync( @Nullable Transaction transaction, @Nullable Mapper<T> mapper, + @Nullable CancellationToken cancellationToken, Statement statement, @Nullable Object... arguments) { Objects.requireNonNull(statement); diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java index 6163a589bb..6c5eef1132 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.sql.engine.property.SqlProperties; import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.lang.CancellationToken; import org.apache.ignite.sql.SqlException; import org.jetbrains.annotations.Nullable; @@ -44,8 +45,12 @@ public class FakeIgniteQueryProcessor implements QueryProcessor { String lastScript; @Override - public CompletableFuture<QueryMetadata> prepareSingleAsync(SqlProperties properties, - @Nullable InternalTransaction transaction, String qry, Object... params) { + public CompletableFuture<QueryMetadata> prepareSingleAsync( + SqlProperties properties, + @Nullable InternalTransaction transaction, + String qry, + Object... params + ) { throw new UnsupportedOperationException(); } @@ -54,6 +59,7 @@ public class FakeIgniteQueryProcessor implements QueryProcessor { SqlProperties properties, HybridTimestampTracker observableTimeTracker, @Nullable InternalTransaction transaction, + @Nullable CancellationToken cancellationToken, String qry, Object... params ) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/Cancellable.java b/modules/core/src/main/java/org/apache/ignite/internal/util/Cancellable.java index 5f6cd721e1..f5d59fc717 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/Cancellable.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/Cancellable.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.util; /** * A {@code Cancellable} represents a process or an operation that can be canceled. */ +@FunctionalInterface public interface Cancellable { /** * Cancels the ongoing operation or process or do nothing if it has been already cancelled. diff --git a/modules/core/src/main/java/org/apache/ignite/lang/CancelHandleHelper.java b/modules/core/src/main/java/org/apache/ignite/lang/CancelHandleHelper.java new file mode 100644 index 0000000000..933f0338e8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/lang/CancelHandleHelper.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.lang; + +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.lang.CancelHandleImpl.CancellationTokenImpl; + +/** + * Utility class to provide direct access to internals of {@link CancelHandleImpl}. + */ +public final class CancelHandleHelper { + + private CancelHandleHelper() { + + } + + /** + * Attaches a cancellable operation to the given token. A cancellation procedure started its handle completes + * when {@code completionFut} completes. + * + * <p>NOTE: If a handle, this token is associated with, was cancelled or its cancellation was requested, + * this method immediately invokes {@code cancelAction.run()} and it this case + * <b>it never waits for {@code completionFut} to complete</b>. + * + * <p>The following methods request cancellation of a handle: + * <ul> + * <li>{@link CancelHandle#cancel()}</li> + * <li>{@link CancelHandle#cancelAsync()}</li> + * </ul> + * + * @param token Cancellation token. + * @param cancelAction Action that terminates an operation. + * @param completionFut Future that completes when operation completes and all resources it created are released. + */ + public static void addCancelAction( + CancellationToken token, + Runnable cancelAction, + CompletableFuture<?> completionFut + ) { + Objects.requireNonNull(token, "token"); + Objects.requireNonNull(cancelAction, "cancelAction"); + Objects.requireNonNull(completionFut, "completionFut"); + + CancellationTokenImpl t = unwrapToken(token); + t.addCancelAction(cancelAction, completionFut); + } + + private static CancellationTokenImpl unwrapToken(CancellationToken token) { + if (token instanceof CancellationTokenImpl) { + return (CancellationTokenImpl) token; + } else { + throw new IllegalArgumentException("Unexpected CancellationToken: " + token.getClass()); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/lang/CancelHandleHelperSelfTest.java b/modules/core/src/test/java/org/apache/ignite/lang/CancelHandleHelperSelfTest.java new file mode 100644 index 0000000000..c2b8ddb43e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/lang/CancelHandleHelperSelfTest.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.lang; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.lang.ErrorGroups.Common; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +/** + * Tests for {@link CancelHandleHelper}. + */ +public class CancelHandleHelperSelfTest extends BaseIgniteAbstractTest { + + @Test + public void testCancelSync() throws InterruptedException { + CancelHandle cancelHandle = CancelHandle.create(); + CancellationToken token = cancelHandle.token(); + + // Initially is not cancelled + assertFalse(cancelHandle.isCancelled()); + + CountDownLatch operationLatch = new CountDownLatch(1); + CompletableFuture<Void> cancelFut = new CompletableFuture<>(); + + Runnable cancelAction = () -> { + try { + operationLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + cancelFut.complete(null); + }; + + CancelHandleHelper.addCancelAction(token, cancelAction, cancelFut); + + CountDownLatch cancelHandleLatch = new CountDownLatch(1); + + // Call cancel in another thread. + Thread thread = new Thread(() -> { + cancelHandle.cancel(); + cancelHandleLatch.countDown(); + }); + thread.start(); + + // Make it possible for cancelAction to complete, because cancelHandle calls it in its thread. + operationLatch.countDown(); + + // Wait until sync cancel returns. + cancelHandleLatch.await(); + + // Cancellation has completed + assertTrue(cancelHandle.cancelAsync().isDone()); + assertTrue(cancelHandle.isCancelled()); + assertTrue(cancelHandle.cancelAsync().isDone()); + + // Should have no affect + cancelHandle.cancel(); + } + + @Test + public void testCancelAsync() { + CancelHandle cancelHandle = CancelHandle.create(); + CancellationToken token = cancelHandle.token(); + + // Initially is not cancelled + assertFalse(cancelHandle.isCancelled()); + CountDownLatch operationLatch = new CountDownLatch(1); + CompletableFuture<Void> cancelFut = new CompletableFuture<>(); + + Runnable cancelAction = () -> { + // Run in another thread to avoid blocking. + Thread thread = new Thread(() -> { + try { + operationLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + cancelFut.complete(null); + }); + thread.start(); + }; + + CancelHandleHelper.addCancelAction(token, cancelAction, cancelFut); + + // Request cancellation and keep the future, to call it later. + CompletableFuture<Void> cancelHandleFut = cancelHandle.cancelAsync(); + assertTrue(cancelHandle.isCancelled()); + + assertFalse(cancelHandleFut.isDone()); + operationLatch.countDown(); + + // Await for cancellation to complete + cancelHandleFut.join(); + + assertTrue(cancelHandle.isCancelled()); + assertTrue(cancelHandle.cancelAsync().isDone()); + } + + @Test + public void testCancelAsyncReturnsCopy() { + CancelHandle cancelHandle = CancelHandle.create(); + + CompletableFuture<Void> f1 = cancelHandle.cancelAsync(); + CompletableFuture<Void> f2 = cancelHandle.cancelAsync(); + assertNotSame(f1, f2); + } + + @Test + public void testRunCancelActionImmediatelyIfCancelSyncCalled() { + CancelHandle cancelHandle = CancelHandle.create(); + CancellationToken token = cancelHandle.token(); + + cancelHandle.cancel(); + assertTrue(cancelHandle.isCancelled()); + + Runnable action = Mockito.mock(Runnable.class); + CompletableFuture<Void> f = new CompletableFuture<>(); + + // Attach it to some operation hasn't completed yet + CancelHandleHelper.addCancelAction(token, action, f); + verify(action, times(1)).run(); + + cancelHandle.cancelAsync().join(); + // We do not wait for cancellation to complete because + // operation has not started yet. + assertFalse(f.isDone()); + + // Action runs immediately + CancelHandleHelper.addCancelAction(token, action, f); + verify(action, times(2)).run(); + } + + @Test + public void testRunCancelActionImmediatelyIfCancelAsyncCalled() { + CancelHandle cancelHandle = CancelHandle.create(); + CancellationToken token = cancelHandle.token(); + + cancelHandle.cancelAsync(); + assertTrue(cancelHandle.isCancelled()); + + Runnable action = Mockito.mock(Runnable.class); + CompletableFuture<Void> f = new CompletableFuture<>(); + + // Attach it to some operation hasn't completed yet + CancelHandleHelper.addCancelAction(token, action, f); + verify(action, times(1)).run(); + + cancelHandle.cancelAsync().join(); + // We do not wait for cancellation to complete because + // operation has not started yet. + assertFalse(f.isDone()); + + // Action runs immediately + CancelHandleHelper.addCancelAction(token, action, f); + verify(action, times(2)).run(); + } + + @Test + public void testArgumentsMustNotBeNull() { + CancelHandle cancelHandle = CancelHandle.create(); + CancellationToken token = cancelHandle.token(); + Runnable action = Mockito.mock(Runnable.class); + CompletableFuture<Void> f = CompletableFuture.completedFuture(null); + + { + NullPointerException err = assertThrows( + NullPointerException.class, + () -> CancelHandleHelper.addCancelAction(null, action, f) + ); + assertEquals("token", err.getMessage()); + } + + { + NullPointerException err = assertThrows( + NullPointerException.class, + () -> CancelHandleHelper.addCancelAction(token, null, f) + ); + assertEquals("cancelAction", err.getMessage()); + } + + { + NullPointerException err = assertThrows( + NullPointerException.class, + () -> CancelHandleHelper.addCancelAction(token, action, null) + ); + assertEquals("completionFut", err.getMessage()); + } + } + + @Test + public void testMultipleOperations() { + class Operation { + private final CountDownLatch latch = new CountDownLatch(1); + private final CompletableFuture<Void> cancelFut = new CompletableFuture<>(); + private final Runnable cancelAction = () -> { + // Run in another thread to avoid blocking. + Thread thread = new Thread(() -> { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + cancelFut.complete(null); + }); + thread.start(); + }; + } + + CancelHandle cancelHandle = CancelHandle.create(); + CancellationToken token = cancelHandle.token(); + + Operation operation1 = new Operation(); + Operation operation2 = new Operation(); + + CancelHandleHelper.addCancelAction(token, operation1.cancelAction, operation1.cancelFut); + CancelHandleHelper.addCancelAction(token, operation2.cancelAction, operation2.cancelFut); + + cancelHandle.cancelAsync(); + assertFalse(operation1.cancelFut.isDone()); + + // Cancel the first operation + operation1.latch.countDown(); + operation1.cancelFut.join(); + + // The cancelHandle is still not done + assertFalse(cancelHandle.cancelAsync().isDone()); + + // Cancel the second operation + operation2.latch.countDown(); + operation2.cancelFut.join(); + + cancelHandle.cancelAsync().join(); + assertTrue(cancelHandle.cancelAsync().isDone()); + } + + @Test + public void testExceptionsInCancelActionsAreWrapped() { + CancelHandle cancelHandle = CancelHandle.create(); + CancellationToken token = cancelHandle.token(); + + RuntimeException e1 = new RuntimeException("e1"); + Runnable r1 = () -> { + throw e1; + }; + CompletableFuture<Object> f1 = new CompletableFuture<>(); + + RuntimeException e2 = new RuntimeException("e2"); + Runnable r2 = () -> { + throw e2; + }; + CompletableFuture<Object> f2 = new CompletableFuture<>(); + + CancelHandleHelper.addCancelAction(token, r1, f1); + CancelHandleHelper.addCancelAction(token, r2, f2); + + f1.complete(null); + f2.complete(null); + + IgniteException err = assertThrows(IgniteException.class, cancelHandle::cancel); + + assertEquals("Failed to cancel an operation", err.getMessage()); + assertEquals(Common.INTERNAL_ERR, err.code(), err.toString()); + assertEquals(Arrays.asList(e1, e2), Arrays.asList(err.getSuppressed())); + } +} diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java index 16117bf584..bde9e1b730 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java @@ -90,7 +90,7 @@ public class AbstractMultiNodeBenchmark { getAllFromCursor( await(queryEngine.queryAsync( - SqlPropertiesHelper.emptyProperties(), igniteImpl.observableTimeTracker(), null, createZoneStatement + SqlPropertiesHelper.emptyProperties(), igniteImpl.observableTimeTracker(), null, null, createZoneStatement )) ); @@ -136,7 +136,7 @@ public class AbstractMultiNodeBenchmark { getAllFromCursor( await(igniteImpl.queryEngine().queryAsync( - SqlPropertiesHelper.emptyProperties(), igniteImpl.observableTimeTracker(), null, createTableStatement + SqlPropertiesHelper.emptyProperties(), igniteImpl.observableTimeTracker(), null, null, createTableStatement )) ); } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java index a33198c32c..52de6ec493 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SelectBenchmark.java @@ -236,11 +236,11 @@ public class SelectBenchmark extends AbstractMultiNodeBenchmark { } private Iterator<InternalSqlRow> query(String sql, Object... args) { - return handleFirstBatch(queryProc.queryAsync(properties, igniteImpl.observableTimeTracker(), null, sql, args)); + return handleFirstBatch(queryProc.queryAsync(properties, igniteImpl.observableTimeTracker(), null, null, sql, args)); } private Iterator<InternalSqlRow> script(String sql, Object... args) { - return handleFirstBatch(queryProc.queryAsync(scriptProperties, igniteImpl.observableTimeTracker(), null, sql, args)); + return handleFirstBatch(queryProc.queryAsync(scriptProperties, igniteImpl.observableTimeTracker(), null, null, sql, args)); } private Iterator<InternalSqlRow> handleFirstBatch(CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFut) { diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlMultiStatementBenchmark.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlMultiStatementBenchmark.java index fe8059283a..2ccb23aa71 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlMultiStatementBenchmark.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SqlMultiStatementBenchmark.java @@ -380,14 +380,14 @@ public class SqlMultiStatementBenchmark extends AbstractMultiNodeBenchmark { Iterator<InternalSqlRow> execQuery(String sql, Object ... args) { AsyncSqlCursor<InternalSqlRow> cursor = - queryProcessor.queryAsync(props, observableTimeTracker, null, sql, args).join(); + queryProcessor.queryAsync(props, observableTimeTracker, null, null, sql, args).join(); return new InternalResultsIterator(cursor, pageSize); } Iterator<InternalSqlRow> execScript(String sql, Object ... args) { AsyncSqlCursor<InternalSqlRow> cursor = - queryProcessor.queryAsync(scriptProps, observableTimeTracker, null, sql, args).join(); + queryProcessor.queryAsync(scriptProps, observableTimeTracker, null, null, sql, args).join(); return new InternalResultsIterator(cursor, pageSize); } diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteSql.java b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteSql.java index e120ebfa19..f56e4cc5b6 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteSql.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/restart/RestartProofIgniteSql.java @@ -21,6 +21,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.ignite.Ignite; import org.apache.ignite.internal.wrapper.Wrapper; import org.apache.ignite.internal.wrapper.Wrappers; +import org.apache.ignite.lang.CancellationToken; import org.apache.ignite.sql.BatchedArguments; import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.sql.ResultSet; @@ -58,71 +59,126 @@ class RestartProofIgniteSql implements IgniteSql, Wrapper { } @Override - public ResultSet<SqlRow> execute(@Nullable Transaction transaction, String query, @Nullable Object... arguments) { - return attachmentLock.attached(ignite -> ignite.sql().execute(transaction, query, arguments)); + public ResultSet<SqlRow> execute( + @Nullable Transaction transaction, + @Nullable CancellationToken cancellationToken, + String query, + @Nullable Object... arguments + ) { + return attachmentLock.attached(ignite -> ignite.sql().execute(transaction, cancellationToken, query, arguments)); } @Override - public ResultSet<SqlRow> execute(@Nullable Transaction transaction, Statement statement, @Nullable Object... arguments) { - return attachmentLock.attached(ignite -> ignite.sql().execute(transaction, statement, arguments)); + public ResultSet<SqlRow> execute( + @Nullable Transaction transaction, + @Nullable CancellationToken cancellationToken, + Statement statement, + @Nullable Object... arguments + ) { + return attachmentLock.attached(ignite -> ignite.sql().execute( + transaction, + cancellationToken, + statement, + arguments) + ); } @Override public <T> ResultSet<T> execute( @Nullable Transaction transaction, @Nullable Mapper<T> mapper, + @Nullable CancellationToken cancellationToken, String query, @Nullable Object... arguments ) { - return attachmentLock.attached(ignite -> ignite.sql().execute(transaction, mapper, query, arguments)); + return attachmentLock.attached(ignite -> ignite.sql().execute( + transaction, + mapper, + cancellationToken, + query, + arguments) + ); } @Override public <T> ResultSet<T> execute( @Nullable Transaction transaction, @Nullable Mapper<T> mapper, + @Nullable CancellationToken cancellationToken, Statement statement, @Nullable Object... arguments ) { - return attachmentLock.attached(ignite -> ignite.sql().execute(transaction, mapper, statement, arguments)); + return attachmentLock.attached(ignite -> ignite.sql().execute( + transaction, + mapper, + cancellationToken, + statement, + arguments) + ); } @Override public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync( @Nullable Transaction transaction, + @Nullable CancellationToken cancellationToken, String query, @Nullable Object... arguments ) { - return attachmentLock.attachedAsync(ignite -> ignite.sql().executeAsync(transaction, query, arguments)); + return attachmentLock.attachedAsync(ignite -> ignite.sql().executeAsync( + transaction, + cancellationToken, + query, + arguments) + ); } @Override public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync( @Nullable Transaction transaction, + @Nullable CancellationToken cancellationToken, Statement statement, @Nullable Object... arguments ) { - return attachmentLock.attachedAsync(ignite -> ignite.sql().executeAsync(transaction, statement, arguments)); + return attachmentLock.attachedAsync(ignite -> ignite.sql().executeAsync( + transaction, + cancellationToken, + statement, + arguments) + ); } @Override public <T> CompletableFuture<AsyncResultSet<T>> executeAsync( @Nullable Transaction transaction, @Nullable Mapper<T> mapper, + @Nullable CancellationToken cancellationToken, String query, @Nullable Object... arguments ) { - return attachmentLock.attachedAsync(ignite -> ignite.sql().executeAsync(transaction, mapper, query, arguments)); + return attachmentLock.attachedAsync(ignite -> ignite.sql().executeAsync( + transaction, + mapper, + cancellationToken, + query, + arguments) + ); } @Override public <T> CompletableFuture<AsyncResultSet<T>> executeAsync( @Nullable Transaction transaction, @Nullable Mapper<T> mapper, + @Nullable CancellationToken cancellationToken, Statement statement, @Nullable Object... arguments ) { - return attachmentLock.attachedAsync(ignite -> ignite.sql().executeAsync(transaction, mapper, statement, arguments)); + return attachmentLock.attachedAsync(ignite -> ignite.sql().executeAsync( + transaction, + mapper, + cancellationToken, + statement, + arguments) + ); } @Override diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java index 9446c7ae8d..a888854552 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java @@ -20,28 +20,37 @@ package org.apache.ignite.internal.sql.api; import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.apache.ignite.internal.sql.SyncResultSetAdapter; import org.apache.ignite.internal.wrapper.Wrappers; +import org.apache.ignite.lang.CancelHandle; +import org.apache.ignite.lang.CancellationToken; +import org.apache.ignite.lang.ErrorGroups.Sql; import org.apache.ignite.sql.BatchedArguments; import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.sql.ResultSet; +import org.apache.ignite.sql.SqlException; import org.apache.ignite.sql.SqlRow; import org.apache.ignite.sql.Statement; import org.apache.ignite.sql.async.AsyncResultSet; import org.apache.ignite.tx.Transaction; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; /** * Tests for asynchronous SQL API. @@ -95,6 +104,95 @@ public class ItSqlAsynchronousApiTest extends ItSqlApiBaseTest { } } + @Test + public void cancelQueryString() throws InterruptedException { + IgniteSql sql = igniteSql(); + String query = "SELECT * FROM system_range(0, 10000000000)"; + + // no transaction + executeAndCancel((token) -> { + return sql.executeAsync(null, token, query); + }); + + // with transaction + executeAndCancel((token) -> { + Transaction transaction = igniteTx().begin(); + + return sql.executeAsync(transaction, token, query); + }); + } + + @Test + public void cancelStatement() throws InterruptedException { + IgniteSql sql = igniteSql(); + + String query = "SELECT * FROM system_range(0, 10000000000)"; + + // no transaction + executeAndCancel((token) -> { + Statement statement = sql.statementBuilder() + .query(query) + .build(); + + return sql.executeAsync(null, token, statement); + }); + + // with transaction + executeAndCancel((token) -> { + Statement statement = sql.statementBuilder() + .query(query) + .build(); + + Transaction transaction = igniteTx().begin(); + + return sql.executeAsync(transaction, token, statement); + }); + } + + private static void executeAndCancel( + Function<CancellationToken, CompletableFuture<AsyncResultSet<SqlRow>>> execute + ) throws InterruptedException { + + CancelHandle cancelHandle = CancelHandle.create(); + CountDownLatch firstResultSetLatch = new CountDownLatch(1); + + CompletableFuture<AsyncResultSet<SqlRow>> resultSetFut = execute.apply(cancelHandle.token()); + + // Wait for the first result set to become available and then cancel the query + resultSetFut.whenComplete((r, t) -> firstResultSetLatch.countDown()); + firstResultSetLatch.await(); + + cancelHandle.cancelAsync(); + + CompletionException err = assertThrows(CompletionException.class, new DrainResultSet(resultSetFut)); + SqlException sqlErr = assertInstanceOf(SqlException.class, err.getCause()); + assertEquals(Sql.EXECUTION_CANCELLED_ERR, sqlErr.code()); + + cancelHandle.cancelAsync().join(); + } + + private static class DrainResultSet implements Executable { + CompletableFuture<? extends AsyncResultSet<SqlRow>> rs; + + DrainResultSet(CompletableFuture<? extends AsyncResultSet<SqlRow>> rs) { + this.rs = rs; + } + + @Override + public void execute() { + AsyncResultSet<SqlRow> current; + do { + current = rs.join(); + if (current.hasRowSet() && current.hasMorePages()) { + rs = current.fetchNextPage(); + } else { + return; + } + + } while (current.hasMorePages()); + } + } + @Override protected ResultSet<SqlRow> executeForRead(IgniteSql sql, Transaction tx, Statement statement, Object... args) { return new SyncResultSetAdapter(await(sql.executeAsync(tx, statement, args))); diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java index 22564c4121..8a16f708f1 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java @@ -23,6 +23,7 @@ import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.tx.IgniteTransactions; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; /** * Tests for asynchronous client SQL API. @@ -40,6 +41,18 @@ public class ItSqlClientAsynchronousApiTest extends ItSqlAsynchronousApiTest { client.close(); } + @Disabled("https://issues.apache.org/jira/browse/IGNITE-23646") + @Override + public void cancelQueryString() throws InterruptedException { + super.cancelQueryString(); + } + + @Disabled("https://issues.apache.org/jira/browse/IGNITE-23646") + @Override + public void cancelStatement() throws InterruptedException { + super.cancelStatement(); + } + @Override protected IgniteSql igniteSql() { return client.sql(); diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java index 283fb792e6..cdcba98bb2 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java @@ -23,6 +23,7 @@ import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.tx.IgniteTransactions; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; /** * Tests for synchronous client SQL API. @@ -40,6 +41,18 @@ public class ItSqlClientSynchronousApiTest extends ItSqlSynchronousApiTest { client.close(); } + @Disabled("https://issues.apache.org/jira/browse/IGNITE-23646") + @Override + public void cancelQueryString() throws InterruptedException { + super.cancelQueryString(); + } + + @Disabled("https://issues.apache.org/jira/browse/IGNITE-23646") + @Override + public void cancelStatement() throws InterruptedException { + super.cancelStatement(); + } + @Override protected IgniteSql igniteSql() { return client.sql(); diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java index 329f82745e..f7cb46f1d9 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java @@ -19,20 +19,108 @@ package org.apache.ignite.internal.sql.api; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.function.Function; +import org.apache.ignite.lang.CancelHandle; +import org.apache.ignite.lang.CancellationToken; +import org.apache.ignite.lang.ErrorGroups.Sql; import org.apache.ignite.sql.BatchedArguments; import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.sql.ResultSet; +import org.apache.ignite.sql.SqlException; import org.apache.ignite.sql.SqlRow; import org.apache.ignite.sql.Statement; import org.apache.ignite.tx.Transaction; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; /** * Tests for synchronous SQL API. */ public class ItSqlSynchronousApiTest extends ItSqlApiBaseTest { + + private final List<Transaction> transactions = new ArrayList<>(); + + @AfterEach + public void rollbackTransactions() { + transactions.forEach(Transaction::rollback); + } + + @Test + public void cancelQueryString() throws InterruptedException { + IgniteSql sql = igniteSql(); + String query = "SELECT * FROM system_range(0, 10000000000)"; + + // no transaction + executeAndCancel((token) -> { + Statement statement = sql.statementBuilder() + .query(query) + .build(); + + return sql.execute(null, token, statement); + }); + + // with transaction + executeAndCancel((token) -> { + Statement statement = sql.statementBuilder() + .query(query) + .build(); + + Transaction transaction = igniteTx().begin(); + return sql.execute(transaction, token, statement); + }); + } + + @Test + public void cancelStatement() throws InterruptedException { + IgniteSql sql = igniteSql(); + String query = "SELECT * FROM system_range(0, 10000000000)"; + + // no transaction + executeAndCancel((token) -> { + return sql.execute(null, token, query); + }); + + // with transaction + executeAndCancel((token) -> { + Transaction transaction = igniteTx().begin(); + return sql.execute(transaction, token, query); + }); + } + + private static void executeAndCancel(Function<CancellationToken, ResultSet<SqlRow>> execute) throws InterruptedException { + CancelHandle cancelHandle = CancelHandle.create(); + + CountDownLatch latch = new CountDownLatch(1); + + // Run statement in another thread + CompletableFuture<Void> f = CompletableFuture.runAsync(() -> { + try (ResultSet<SqlRow> resultSet = execute.apply(cancelHandle.token())) { + // Start a query and cancel it later. + latch.countDown(); + while (resultSet.hasNext()) { + resultSet.next(); + } + } + }); + + latch.await(); + cancelHandle.cancelAsync(); + + CompletionException err = assertThrows(CompletionException.class, f::join); + SqlException sqlErr = assertInstanceOf(SqlException.class, err.getCause()); + assertEquals(Sql.EXECUTION_CANCELLED_ERR, sqlErr.code()); + + cancelHandle.cancelAsync().join(); + } + @Override protected ResultSet<SqlRow> executeForRead(IgniteSql sql, Transaction tx, Statement statement, Object... args) { return sql.execute(tx, statement, args); diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/BaseSqlMultiStatementTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/BaseSqlMultiStatementTest.java index 23ed7fc3d8..9f40678fd1 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/BaseSqlMultiStatementTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/BaseSqlMultiStatementTest.java @@ -82,7 +82,7 @@ public abstract class BaseSqlMultiStatementTest extends BaseSqlIntegrationTest { .build(); AsyncSqlCursor<InternalSqlRow> cursor = await( - queryProcessor().queryAsync(properties, observableTimeTracker(), tx, query, params) + queryProcessor().queryAsync(properties, observableTimeTracker(), tx, null, query, params) ); return Objects.requireNonNull(cursor); diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItQueryCancelTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItQueryCancelTest.java new file mode 100644 index 0000000000..6c47f523e5 --- /dev/null +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItQueryCancelTest.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine; + +import static org.apache.ignite.internal.sql.engine.QueryProperty.ALLOWED_QUERY_TYPES; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import org.apache.ignite.internal.TestWrappers; +import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.sql.BaseSqlIntegrationTest; +import org.apache.ignite.internal.sql.engine.property.SqlProperties; +import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper; +import org.apache.ignite.internal.tx.HybridTimestampTracker; +import org.apache.ignite.internal.util.AsyncCursor.BatchedResult; +import org.apache.ignite.lang.CancelHandle; +import org.apache.ignite.lang.CancellationToken; +import org.apache.ignite.lang.ErrorGroups.Sql; +import org.apache.ignite.sql.SqlException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +/** Set of test cases for query cancellation. */ +public class ItQueryCancelTest extends BaseSqlIntegrationTest { + + /** Calling {@link CancelHandle#cancel()} should cancel execution of a single query. */ + @Test + public void testCancelSingleQuery() { + IgniteImpl igniteImpl = TestWrappers.unwrapIgniteImpl(CLUSTER.node(0)); + + SqlProperties properties = SqlPropertiesHelper.newBuilder() + .set(ALLOWED_QUERY_TYPES, Set.of(SqlQueryType.QUERY)) + .build(); + + HybridTimestampTracker hybridTimestampTracker = igniteImpl.observableTimeTracker(); + + SqlQueryProcessor qryProc = queryProcessor(); + + StringBuilder query = new StringBuilder(); + + query.append("SELECT v FROM (VALUES "); + for (int i = 0; i < 100; i++) { + if (i > 0) { + query.append(", "); + } + query.append("("); + query.append(i); + query.append(")"); + } + query.append(" ) t(v)"); + + CancelHandle cancelHandle = CancelHandle.create(); + CancellationToken token = cancelHandle.token(); + + AsyncSqlCursor<InternalSqlRow> query1 = qryProc.queryAsync( + properties, + hybridTimestampTracker, + null, + token, + query.toString() + ).join(); + + assertEquals(1, qryProc.runningQueries()); + + // Request cancellation. + CompletableFuture<Void> cancelled = cancelHandle.cancelAsync(); + + // Obverse cancellation error + expectQueryCancelled(new DrainCursor(query1)); + + cancelled.join(); + + assertEquals(0, qryProc.runningQueries()); + } + + /** Calling {@link CancelHandle#cancel()} should cancel execution of multiple queries. */ + @Test + public void testCancelMultipleQueries() { + IgniteImpl igniteImpl = TestWrappers.unwrapIgniteImpl(CLUSTER.node(0)); + + SqlProperties properties = SqlPropertiesHelper.newBuilder() + .set(ALLOWED_QUERY_TYPES, Set.of(SqlQueryType.QUERY)) + .build(); + + HybridTimestampTracker hybridTimestampTracker = igniteImpl.observableTimeTracker(); + + SqlQueryProcessor qryProc = queryProcessor(); + + StringBuilder query = new StringBuilder(); + + query.append("SELECT v FROM (VALUES "); + for (int i = 0; i < 100; i++) { + if (i > 0) { + query.append(", "); + } + query.append("("); + query.append(i); + query.append(")"); + } + query.append(" ) t(v)"); + + CancelHandle cancelHandle = CancelHandle.create(); + CancellationToken token = cancelHandle.token(); + + AsyncSqlCursor<InternalSqlRow> query1 = qryProc.queryAsync( + properties, + hybridTimestampTracker, + null, + token, + query.toString() + ).join(); + + AsyncSqlCursor<InternalSqlRow> query2 = qryProc.queryAsync( + properties, + hybridTimestampTracker, + null, + token, + query.toString() + ).join(); + + assertEquals(2, qryProc.runningQueries()); + + // Request cancellation. + CompletableFuture<Void> cancelled = cancelHandle.cancelAsync(); + + // Obverse cancellation errors + expectQueryCancelled(new DrainCursor(query1)); + expectQueryCancelled(new DrainCursor(query2)); + + cancelled.join(); + + assertEquals(0, qryProc.runningQueries()); + } + + /** Starting a query with a cancelled token should trigger query cancellation. */ + @Test + public void testQueryWontStartWhenHandleIsCancelled() { + IgniteImpl igniteImpl = TestWrappers.unwrapIgniteImpl(CLUSTER.node(0)); + + SqlProperties properties = SqlPropertiesHelper.newBuilder() + .set(ALLOWED_QUERY_TYPES, Set.of(SqlQueryType.QUERY)) + .build(); + + HybridTimestampTracker hybridTimestampTracker = igniteImpl.observableTimeTracker(); + + SqlQueryProcessor qryProc = queryProcessor(); + + CancelHandle cancelHandle = CancelHandle.create(); + CancellationToken token = cancelHandle.token(); + + cancelHandle.cancel(); + + Runnable run = () -> qryProc.queryAsync( + properties, + hybridTimestampTracker, + null, + token, + "SELECT 1" + ).join(); + + expectQueryCancelledWithQueryCancelledException(run); + + assertEquals(0, qryProc.runningQueries()); + } + + /** Calling {@link CancelHandle#cancel()} should cancel execution of queries that use executable plans. */ + @ParameterizedTest + @ValueSource(strings = { + "SELECT * FROM t WHERE id = 1", + "INSERT INTO t VALUES (1, 1)", + "SELECT COUNT(*) FROM t", + }) + public void testExecutablePlans(String query) { + sql("CREATE TABLE IF NOT EXISTS t (id INT PRIMARY KEY, val INT)"); + + IgniteImpl igniteImpl = TestWrappers.unwrapIgniteImpl(CLUSTER.node(0)); + + SqlProperties properties = SqlPropertiesHelper.newBuilder() + .set(ALLOWED_QUERY_TYPES, Set.of(SqlQueryType.QUERY, SqlQueryType.DML)) + .build(); + + HybridTimestampTracker hybridTimestampTracker = igniteImpl.observableTimeTracker(); + + SqlQueryProcessor qryProc = queryProcessor(); + + CancelHandle cancelHandle = CancelHandle.create(); + CancellationToken token = cancelHandle.token(); + + Runnable run = () -> qryProc.queryAsync( + properties, + hybridTimestampTracker, + null, + token, + query + ).join(); + + // Request cancellation + CompletableFuture<Void> f = cancelHandle.cancelAsync(); + + // Obverse cancellation error + expectQueryCancelledWithQueryCancelledException(run); + + f.join(); + + assertEquals(0, qryProc.runningQueries()); + } + + private static void expectQueryCancelled(Runnable action) { + CompletionException err = assertThrows(CompletionException.class, action::run); + SqlException sqlErr = assertInstanceOf(SqlException.class, err.getCause()); + assertEquals(Sql.EXECUTION_CANCELLED_ERR, sqlErr.code(), sqlErr.toString()); + } + + // TODO: https://issues.apache.org/jira/browse/IGNITE-23637 remove this method and use expectQueryCancelled instead. + private static void expectQueryCancelledWithQueryCancelledException(Runnable action) { + CompletionException err = assertThrows(CompletionException.class, action::run); + assertInstanceOf(QueryCancelledException.class, err.getCause()); + } + + private static class DrainCursor implements Runnable { + final AsyncSqlCursor<?> cursor; + + DrainCursor(AsyncSqlCursor<?> cursor) { + this.cursor = cursor; + } + + @Override + public void run() { + BatchedResult<?> batch; + do { + batch = cursor.requestNextAsync(1).join(); + } while (!batch.items().isEmpty()); + } + } +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java index 5a2da1d991..fec0ee6785 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java @@ -62,6 +62,7 @@ import org.apache.ignite.internal.util.ArrayUtils; import org.apache.ignite.internal.util.AsyncCursor; import org.apache.ignite.internal.util.ExceptionUtils; import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.apache.ignite.lang.CancellationToken; import org.apache.ignite.lang.TraceableException; import org.apache.ignite.sql.BatchedArguments; import org.apache.ignite.sql.IgniteSql; @@ -201,11 +202,16 @@ public class IgniteSqlImpl implements IgniteSql, IgniteComponent { /** {@inheritDoc} */ @Override - public ResultSet<SqlRow> execute(@Nullable Transaction transaction, String query, @Nullable Object... arguments) { + public ResultSet<SqlRow> execute( + @Nullable Transaction transaction, + @Nullable CancellationToken cancellationToken, + String query, + @Nullable Object... arguments + ) { Objects.requireNonNull(query); try { - return new SyncResultSetAdapter<>(executeAsync(transaction, query, arguments).join()); + return new SyncResultSetAdapter<>(executeAsync(transaction, cancellationToken, query, arguments).join()); } catch (CompletionException e) { throw ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e)); } @@ -213,11 +219,16 @@ public class IgniteSqlImpl implements IgniteSql, IgniteComponent { /** {@inheritDoc} */ @Override - public ResultSet<SqlRow> execute(@Nullable Transaction transaction, Statement statement, @Nullable Object... arguments) { + public ResultSet<SqlRow> execute( + @Nullable Transaction transaction, + @Nullable CancellationToken cancellationToken, + Statement statement, + @Nullable Object... arguments + ) { Objects.requireNonNull(statement); try { - return new SyncResultSetAdapter<>(executeAsync(transaction, statement, arguments).join()); + return new SyncResultSetAdapter<>(executeAsync(transaction, cancellationToken, statement, arguments).join()); } catch (CompletionException e) { throw ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e)); } @@ -225,15 +236,16 @@ public class IgniteSqlImpl implements IgniteSql, IgniteComponent { /** {@inheritDoc} */ @Override - public <T> ResultSet<T> execute( + public <T> ResultSet<T> execute( @Nullable Transaction transaction, @Nullable Mapper<T> mapper, + @Nullable CancellationToken cancellationToken, String query, @Nullable Object... arguments) { Objects.requireNonNull(query); try { - return new SyncResultSetAdapter<>(executeAsync(transaction, mapper, query, arguments).join()); + return new SyncResultSetAdapter<>(executeAsync(transaction, mapper, cancellationToken, query, arguments).join()); } catch (CompletionException e) { throw ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e)); } @@ -241,9 +253,10 @@ public class IgniteSqlImpl implements IgniteSql, IgniteComponent { /** {@inheritDoc} */ @Override - public <T> ResultSet<T> execute( + public <T> ResultSet<T> execute( @Nullable Transaction transaction, @Nullable Mapper<T> mapper, + @Nullable CancellationToken cancellationToken, Statement statement, @Nullable Object... arguments) { Objects.requireNonNull(statement); @@ -291,26 +304,32 @@ public class IgniteSqlImpl implements IgniteSql, IgniteComponent { @Override public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync( @Nullable Transaction transaction, + @Nullable CancellationToken cancellationToken, String query, @Nullable Object... arguments ) { - return executeAsyncInternal(transaction, createStatement(query), arguments); + return executeAsyncInternal(transaction, cancellationToken, createStatement(query), arguments); } /** {@inheritDoc} */ @Override public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync( @Nullable Transaction transaction, + @Nullable CancellationToken cancellationToken, Statement statement, @Nullable Object... arguments ) { - return executeAsyncInternal(transaction, statement, arguments); + return executeAsyncInternal(transaction, cancellationToken, statement, arguments); } /** {@inheritDoc} */ @Override - public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(@Nullable Transaction transaction, @Nullable Mapper<T> mapper, - String query, @Nullable Object... arguments) { + public <T> CompletableFuture<AsyncResultSet<T>> executeAsync( + @Nullable Transaction transaction, + @Nullable Mapper<T> mapper, + @Nullable CancellationToken cancellationToken, + String query, @Nullable Object... arguments + ) { // TODO: IGNITE-18695. throw new UnsupportedOperationException("Not implemented yet."); } @@ -320,14 +339,17 @@ public class IgniteSqlImpl implements IgniteSql, IgniteComponent { public <T> CompletableFuture<AsyncResultSet<T>> executeAsync( @Nullable Transaction transaction, @Nullable Mapper<T> mapper, + @Nullable CancellationToken cancellationToken, Statement statement, - @Nullable Object... arguments) { + @Nullable Object... arguments + ) { // TODO: IGNITE-18695. throw new UnsupportedOperationException("Not implemented yet."); } private CompletableFuture<AsyncResultSet<SqlRow>> executeAsyncInternal( @Nullable Transaction transaction, + @Nullable CancellationToken cancellationToken, Statement statement, @Nullable Object... arguments ) { @@ -347,7 +369,12 @@ public class IgniteSqlImpl implements IgniteSql, IgniteComponent { .build(); result = queryProcessor.queryAsync( - properties, observableTimestampTracker, (InternalTransaction) transaction, statement.query(), arguments + properties, + observableTimestampTracker, + (InternalTransaction) transaction, + cancellationToken, + statement.query(), + arguments ).thenCompose(cur -> { if (!busyLock.enterBusy()) { cur.closeAsync(); @@ -459,7 +486,7 @@ public class IgniteSqlImpl implements IgniteSql, IgniteComponent { } try { - return queryProcessor.queryAsync(properties0, observableTimestampTracker, transaction, query, args) + return queryProcessor.queryAsync(properties0, observableTimestampTracker, transaction, null, query, args) .thenCompose(cursor -> { if (!enterBusy.get()) { cursor.closeAsync(); @@ -578,8 +605,14 @@ public class IgniteSqlImpl implements IgniteSql, IgniteComponent { .set(QueryProperty.ALLOWED_QUERY_TYPES, SqlQueryType.ALL) .build()); - CompletableFuture<AsyncSqlCursor<InternalSqlRow>> f = - queryProcessor.queryAsync(properties0, observableTimestampTracker, null, query, arguments); + CompletableFuture<AsyncSqlCursor<InternalSqlRow>> f = queryProcessor.queryAsync( + properties0, + observableTimestampTracker, + null, + null, + query, + arguments + ); CompletableFuture<Void> resFut = new CompletableFuture<>(); ScriptHandler handler = new ScriptHandler(resFut, enterBusy, leaveBusy); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/PublicApiThreadingIgniteSql.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/PublicApiThreadingIgniteSql.java index 6d3a33c2b9..24e774c867 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/PublicApiThreadingIgniteSql.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/PublicApiThreadingIgniteSql.java @@ -25,6 +25,7 @@ import java.util.concurrent.Executor; import java.util.function.Supplier; import org.apache.ignite.internal.thread.PublicApiThreading; import org.apache.ignite.internal.wrapper.Wrapper; +import org.apache.ignite.lang.CancellationToken; import org.apache.ignite.sql.BatchedArguments; import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.sql.ResultSet; @@ -61,71 +62,87 @@ public class PublicApiThreadingIgniteSql implements IgniteSql, Wrapper { } @Override - public ResultSet<SqlRow> execute(@Nullable Transaction transaction, String query, @Nullable Object... arguments) { - return execUserSyncOperation(() -> sql.execute(transaction, query, arguments)); + public ResultSet<SqlRow> execute( + @Nullable Transaction transaction, + @Nullable CancellationToken cancellationToken, + String query, + @Nullable Object... arguments + ) { + return execUserSyncOperation(() -> sql.execute(transaction, cancellationToken, query, arguments)); } @Override - public ResultSet<SqlRow> execute(@Nullable Transaction transaction, Statement statement, @Nullable Object... arguments) { - return execUserSyncOperation(() -> sql.execute(transaction, statement, arguments)); + public ResultSet<SqlRow> execute( + @Nullable Transaction transaction, + @Nullable CancellationToken cancellationToken, + Statement statement, + @Nullable Object... arguments + ) { + return execUserSyncOperation(() -> sql.execute(transaction, cancellationToken, statement, arguments)); } @Override public <T> ResultSet<T> execute( @Nullable Transaction transaction, @Nullable Mapper<T> mapper, + @Nullable CancellationToken cancellationToken, String query, @Nullable Object... arguments ) { - return execUserSyncOperation(() -> sql.execute(transaction, mapper, query, arguments)); + return execUserSyncOperation(() -> sql.execute(transaction, mapper, cancellationToken, query, arguments)); } @Override public <T> ResultSet<T> execute( @Nullable Transaction transaction, @Nullable Mapper<T> mapper, + @Nullable CancellationToken cancellationToken, Statement statement, @Nullable Object... arguments ) { - return execUserSyncOperation(() -> sql.execute(transaction, mapper, statement, arguments)); + return execUserSyncOperation(() -> sql.execute(transaction, mapper, cancellationToken, statement, arguments)); } @Override public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync( @Nullable Transaction transaction, + @Nullable CancellationToken cancellationToken, String query, @Nullable Object... arguments ) { - return doAsyncOperationForResultSet(() -> sql.executeAsync(transaction, query, arguments)); + return doAsyncOperationForResultSet(() -> sql.executeAsync(transaction, cancellationToken, query, arguments)); } @Override public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync( @Nullable Transaction transaction, + @Nullable CancellationToken cancellationToken, Statement statement, @Nullable Object... arguments ) { - return doAsyncOperationForResultSet(() -> sql.executeAsync(transaction, statement, arguments)); + return doAsyncOperationForResultSet(() -> sql.executeAsync(transaction, cancellationToken, statement, arguments)); } @Override public <T> CompletableFuture<AsyncResultSet<T>> executeAsync( @Nullable Transaction transaction, @Nullable Mapper<T> mapper, + @Nullable CancellationToken cancellationToken, String query, @Nullable Object... arguments ) { - return doAsyncOperationForResultSet(() -> sql.executeAsync(transaction, mapper, query, arguments)); + return doAsyncOperationForResultSet(() -> sql.executeAsync(transaction, mapper, cancellationToken, query, arguments)); } @Override public <T> CompletableFuture<AsyncResultSet<T>> executeAsync( @Nullable Transaction transaction, @Nullable Mapper<T> mapper, + @Nullable CancellationToken cancellationToken, Statement statement, @Nullable Object... arguments ) { - return doAsyncOperationForResultSet(() -> sql.executeAsync(transaction, mapper, statement, arguments)); + return doAsyncOperationForResultSet(() -> sql.executeAsync(transaction, mapper, cancellationToken, statement, arguments)); } @Override diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryCancel.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryCancel.java index 65ce14b2b0..1b1fb5ab66 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryCancel.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryCancel.java @@ -88,14 +88,14 @@ public class QueryCancel { } /** Returns {@code true} if the cancellation procedure has already been started. */ - public synchronized boolean isCancelled() { + public boolean isCancelled() { return state.isDone(); } private static void throwException(Reason reason) { throw new QueryCancelledException( reason == Reason.TIMEOUT - ? QueryCancelledException.TIMEOUT_MSG + ? QueryCancelledException.TIMEOUT_MSG : QueryCancelledException.CANCEL_MSG ); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java index 38471488c9..68b30ec977 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.sql.engine.prepare.QueryMetadata; import org.apache.ignite.internal.sql.engine.property.SqlProperties; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.lang.CancellationToken; import org.apache.ignite.lang.IgniteException; import org.jetbrains.annotations.Nullable; @@ -32,21 +33,23 @@ import org.jetbrains.annotations.Nullable; public interface QueryProcessor extends IgniteComponent { /** - * Returns columns and parameters metadata for the given statement. - * This method uses optional array of parameters to assist with type inference. + * Returns columns and parameters metadata for the given statement. This method uses optional array of parameters to assist with type + * inference. * * @param properties User query properties. See {@link QueryProperty} for available properties. * @param transaction A transaction to use to resolve a schema. * @param qry Single statement SQL query. * @param params Query parameters. * @return Query metadata. - * * @throws IgniteException in case of an error. * @see QueryProperty */ - CompletableFuture<QueryMetadata> prepareSingleAsync(SqlProperties properties, + CompletableFuture<QueryMetadata> prepareSingleAsync( + SqlProperties properties, @Nullable InternalTransaction transaction, - String qry, Object... params); + String qry, + Object... params + ); /** * Execute the query with given schema name and parameters. @@ -55,6 +58,7 @@ public interface QueryProcessor extends IgniteComponent { * @param observableTime Tracker of the latest time observed by client. * @param transaction A transaction to use for query execution. If null, an implicit transaction * will be started by provided transactions facade. + * @param cancellationToken Cancellation token or {@code null}. * @param qry SQL query. * @param params Query parameters. * @return Sql cursor. @@ -66,6 +70,7 @@ public interface QueryProcessor extends IgniteComponent { SqlProperties properties, HybridTimestampTracker observableTime, @Nullable InternalTransaction transaction, + @Nullable CancellationToken cancellationToken, String qry, Object... params ); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index 0c7f9047e8..3174d51c8c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -101,6 +101,7 @@ import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.impl.TransactionInflights; import org.apache.ignite.internal.util.ExceptionUtils; import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.apache.ignite.lang.CancellationToken; import org.apache.ignite.sql.SqlException; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; @@ -407,6 +408,7 @@ public class SqlQueryProcessor implements QueryProcessor, SystemViewProvider { SqlProperties properties, HybridTimestampTracker observableTimeTracker, @Nullable InternalTransaction transaction, + @Nullable CancellationToken cancellationToken, String qry, Object... params ) { @@ -418,7 +420,13 @@ public class SqlQueryProcessor implements QueryProcessor, SystemViewProvider { QueryTransactionContext txContext = new QueryTransactionContextImpl(txManager, observableTimeTracker, transaction, txTracker); - return queryExecutor.executeQuery(properties, txContext, qry, params); + return queryExecutor.executeQuery( + properties, + txContext, + qry, + cancellationToken, + params + ); } finally { busyLock.leaveBusy(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java index 229cd17c86..f582767d09 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java @@ -62,6 +62,7 @@ import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.lang.IgniteInternalCheckedException; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.IgniteStringBuilder; +import org.apache.ignite.internal.lang.SqlExceptionMapperUtil; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.network.TopologyEventHandler; @@ -304,7 +305,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve assert cancelHandler != null; - // This call triggers a timeout exception, if operation has timed out. + // This call immediately triggers a cancellation exception if operation has timed out or it has already been cancelled. cancelHandler.add(timeout -> { QueryCompletionReason reason = timeout ? QueryCompletionReason.TIMEOUT : QueryCompletionReason.CANCEL; queryManager.close(reason); @@ -463,11 +464,9 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve assert queryCancel != null; queryCancel.add(timeout -> { - if (!timeout) { - return; + if (timeout) { + ret.completeExceptionally(new QueryCancelledException(QueryCancelledException.TIMEOUT_MSG)); } - - ret.completeExceptionally(new QueryCancelledException(QueryCancelledException.TIMEOUT_MSG)); }); return new IteratorToDataCursorAdapter<>(ret, Runnable::run); @@ -976,6 +975,22 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve private AsyncCursor<InternalSqlRow> execute(InternalTransaction tx, MultiStepPlan multiStepPlan) { assert root != null; + // Query has already been cancelled, return immediately. + if (cancelled.get()) { + return new AsyncCursor<>() { + @Override + public CompletableFuture<BatchedResult<InternalSqlRow>> requestNextAsync(int rows) { + Throwable t = SqlExceptionMapperUtil.mapToPublicSqlException(new QueryCancelledException()); + return CompletableFuture.failedFuture(t); + } + + @Override + public CompletableFuture<Void> closeAsync() { + return DistributedQueryManager.this.cancelFut; + } + }; + } + boolean mapOnBackups = tx.isReadOnly(); MappingParameters mappingParameters = MappingParameters.create(ctx.parameters(), mapOnBackups); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java index 6154a57958..b7a6414a92 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java @@ -38,7 +38,7 @@ import org.jetbrains.annotations.Nullable; /** * Represents a query initiated on current node. - * + * * <p>Encapsulates intermediate state populated throughout query lifecycle. */ class Query implements Runnable { @@ -170,7 +170,7 @@ class Query implements Runnable { return onPhaseStartedCallback.computeIfAbsent(phase, k -> new CompletableFuture<>()); } - /** Moves the query to a given state. */ + /** Moves the query to a given state. */ void moveTo(ExecutionPhase newPhase) { synchronized (mux) { assert currentPhase.transitionAllowed(newPhase) : "currentPhase=" + currentPhase + ", newPhase=" + newPhase; diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java index c955914834..6386895cdb 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java @@ -55,6 +55,8 @@ import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper; import org.apache.ignite.internal.sql.engine.util.cache.Cache; import org.apache.ignite.internal.sql.engine.util.cache.CacheFactory; import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.apache.ignite.lang.CancelHandleHelper; +import org.apache.ignite.lang.CancellationToken; import org.jetbrains.annotations.Nullable; /** @@ -128,6 +130,7 @@ public class QueryExecutor implements LifecycleAware { * @param properties User query properties. See {@link QueryProperty} for available properties. * @param txContext Transactional context to use. * @param sql Query string. + * @param cancellationToken Cancellation token. * @param params Query parameters. * @return Future which will be completed with cursor. */ @@ -135,7 +138,8 @@ public class QueryExecutor implements LifecycleAware { SqlProperties properties, QueryTransactionContext txContext, String sql, - Object... params + @Nullable CancellationToken cancellationToken, + Object[] params ) { SqlProperties properties0 = SqlPropertiesHelper.chain(properties, defaultProperties); @@ -155,7 +159,7 @@ public class QueryExecutor implements LifecycleAware { } try { - trackQuery(query); + trackQuery(query, cancellationToken); } finally { busyLock.leaveBusy(); } @@ -199,7 +203,7 @@ public class QueryExecutor implements LifecycleAware { } try { - trackQuery(query); + trackQuery(query, null); } finally { busyLock.leaveBusy(); } @@ -310,13 +314,19 @@ public class QueryExecutor implements LifecycleAware { ); } - private void trackQuery(Query query) { + private void trackQuery(Query query, @Nullable CancellationToken cancellationToken) { Query old = runningQueries.put(query.id, query); assert old == null : "Query with the same id already registered"; - query.onPhaseStarted(ExecutionPhase.TERMINATED) - .whenComplete((ignored, ex) -> runningQueries.remove(query.id)); + CompletableFuture<Void> queryTerminationFut = query.onPhaseStarted(ExecutionPhase.TERMINATED); + CompletableFuture<Void> queryTerminationDoneFut = queryTerminationFut.whenComplete((ignored, ex) -> { + runningQueries.remove(query.id); + }); + + if (cancellationToken != null) { + CancelHandleHelper.addCancelAction(cancellationToken, query.cancel::cancel, queryTerminationDoneFut); + } } /** Returns list of queries registered on server at the moment. */ diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java index 690c23d767..7259223588 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java @@ -231,8 +231,6 @@ public class PrepareServiceImpl implements PrepareService { return CompletableFuture.failedFuture(new SchemaNotFoundException(schemaName)); } - // Add an action to trigger planner timeout, when operation times out. - // Or trigger timeout immediately if operation has already timed out. QueryCancel cancelHandler = operationContext.cancel(); assert cancelHandler != null; boolean explicitTx = operationContext.txContext() != null && operationContext.txContext().explicitTx() != null; diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/IgniteSqlImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/IgniteSqlImplTest.java index 198bfedff5..3d7dc79a64 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/IgniteSqlImplTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/IgniteSqlImplTest.java @@ -98,7 +98,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest { when(result.onClose()) .thenReturn(closeFuture); - when(queryProcessor.queryAsync(any(), any(), any(), any(), any(Object[].class))) + when(queryProcessor.queryAsync(any(), any(), any(), any(), any(), any(Object[].class))) .thenReturn(completedFuture(result)); AsyncResultSet<?> rs = await(igniteSql.executeAsync(null, "SELECT 1")); @@ -121,7 +121,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest { when(result.onClose()).thenReturn(new CompletableFuture<>()); when(result.closeAsync()).thenReturn(nullCompletedFuture()); - when(queryProcessor.queryAsync(any(), any(), any(), any(), any(Object[].class))) + when(queryProcessor.queryAsync(any(), any(), any(), any(), any(), any(Object[].class))) .thenReturn(completedFuture(result)); AsyncResultSet<?> rs = await(igniteSql.executeAsync(null, "SELECT 1")); @@ -139,7 +139,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest { void resultSetIsNotCreatedIfComponentIsStoppedInMiddleOfOperation() throws Exception { CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture = new CompletableFuture<>(); CountDownLatch executeQueryLatch = new CountDownLatch(1); - when(queryProcessor.queryAsync(any(), any(), any(), any(), any(Object[].class))) + when(queryProcessor.queryAsync(any(), any(), any(), any(), any(), any(Object[].class))) .thenAnswer(ignored -> { executeQueryLatch.countDown(); return cursorFuture; @@ -175,7 +175,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest { CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture = new CompletableFuture<>(); CountDownLatch executeQueryLatch = new CountDownLatch(3); - when(queryProcessor.queryAsync(any(), any(), any(), any(), any(Object[].class))) + when(queryProcessor.queryAsync(any(), any(), any(), any(), any(), any(Object[].class))) .thenAnswer(ignored -> { executeQueryLatch.countDown(); @@ -206,7 +206,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest { () -> await(result) ); assertThat(igniteSql.openedCursors(), empty()); - verify(queryProcessor, times(3)).queryAsync(any(), any(), any(), any(), any(Object[].class)); + verify(queryProcessor, times(3)).queryAsync(any(), any(), any(), any(), any(), any(Object[].class)); verify(cursor).closeAsync(); } @@ -232,7 +232,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest { when(result.onClose()) .thenReturn(closeFuture); - when(queryProcessor.queryAsync(any(), any(), any(), any(), any(Object[].class))) + when(queryProcessor.queryAsync(any(), any(), any(), any(), any(), any(Object[].class))) .thenReturn(completedFuture(result)); AsyncResultSet<?> rs = await(igniteSql.executeAsync(null, "SELECT 1")); @@ -264,7 +264,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest { when(cursor2.hasNextResult()).thenReturn(false); when(cursor2.closeAsync()).thenReturn(nullCompletedFuture()); - when(queryProcessor.queryAsync(any(), any(), any(), any(), any(Object[].class))) + when(queryProcessor.queryAsync(any(), any(), any(), any(), any(), any(Object[].class))) .thenReturn(completedFuture(cursor1)); Void rs = await(igniteSql.executeScriptAsync("SELECT 1; SELECT 2")); @@ -281,7 +281,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest { when(cursor1.nextResult()).thenReturn(CompletableFuture.failedFuture(new RuntimeException("Broken"))); when(cursor1.closeAsync()).thenReturn(nullCompletedFuture()); - when(queryProcessor.queryAsync(any(), any(), any(), any(), any(Object[].class))) + when(queryProcessor.queryAsync(any(), any(), any(), any(), any(), any(Object[].class))) .thenReturn(completedFuture(cursor1)); assertThrowsSqlException( @@ -310,7 +310,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest { when(cursor2.nextResult()).thenReturn(CompletableFuture.failedFuture(lastCursorScriptException)); when(cursor2.closeAsync()).thenReturn(CompletableFuture.failedFuture(cursorCloseException2)); - when(queryProcessor.queryAsync(any(), any(), any(), any(), any(Object[].class))) + when(queryProcessor.queryAsync(any(), any(), any(), any(), any(), any(Object[].class))) .thenReturn(completedFuture(cursor1)); SqlException sqlEx = assertThrowsExactly(SqlException.class, () -> await(igniteSql.executeScriptAsync("SELECT 1; SELECT 2"))); @@ -336,7 +336,7 @@ class IgniteSqlImplTest extends BaseIgniteAbstractTest { when(cursor2.closeAsync()).thenReturn(nullCompletedFuture()); - when(queryProcessor.queryAsync(any(), any(), any(), any(), any(Object[].class))) + when(queryProcessor.queryAsync(any(), any(), any(), any(), any(), any(Object[].class))) .thenReturn(completedFuture(cursor1)); Thread thread = new Thread(() -> { diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java index 6d213cbee0..c26427b329 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java @@ -45,6 +45,7 @@ import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.type.NativeTypes; +import org.apache.ignite.lang.CancellationToken; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -120,8 +121,12 @@ public class TransactionEnlistTest extends BaseIgniteAbstractTest { } @Override - public CompletableFuture<QueryMetadata> prepareSingleAsync(SqlProperties properties, - @Nullable InternalTransaction transaction, String qry, Object... params) { + public CompletableFuture<QueryMetadata> prepareSingleAsync( + SqlProperties properties, + @Nullable InternalTransaction transaction, + String qry, + Object... params + ) { assert params == null || params.length == 0 : "params are not supported"; assert prepareOnly : "Expected that the query will be executed"; @@ -135,6 +140,7 @@ public class TransactionEnlistTest extends BaseIgniteAbstractTest { SqlProperties properties, HybridTimestampTracker observableTimeTracker, @Nullable InternalTransaction transaction, + @Nullable CancellationToken cancellationToken, String qry, Object... params ) { diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java index 0eb50b9a33..61abdcbd3b 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java @@ -88,6 +88,7 @@ import org.apache.ignite.internal.partitiondistribution.Assignment; import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments; import org.apache.ignite.internal.partitiondistribution.TokenizedAssignmentsImpl; import org.apache.ignite.internal.sql.SqlCommon; +import org.apache.ignite.internal.sql.engine.QueryCancel; import org.apache.ignite.internal.sql.engine.SqlQueryProcessor; import org.apache.ignite.internal.sql.engine.exec.ExecutableTable; import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry; @@ -511,6 +512,9 @@ public class TestBuilders { /** Sets the dynamic parameters this fragment will be executed with. */ ExecutionContextBuilder dynamicParameters(Object... params); + /** Sets the query cancellation procedure. */ + ExecutionContextBuilder queryCancel(QueryCancel queryCancel); + /** * Builds the context object. * @@ -526,6 +530,7 @@ public class TestBuilders { private QueryTaskExecutor executor = null; private ClusterNode node = null; private Object[] dynamicParams = ArrayUtils.OBJECT_EMPTY_ARRAY; + private QueryCancel queryCancel = null; /** {@inheritDoc} */ @Override @@ -559,12 +564,20 @@ public class TestBuilders { return this; } + /** {@inheritDoc} */ @Override public ExecutionContextBuilder dynamicParameters(Object... params) { this.dynamicParams = params; return this; } + /** {@inheritDoc} */ + @Override + public ExecutionContextBuilder queryCancel(QueryCancel queryCancel) { + this.queryCancel = queryCancel; + return this; + } + /** {@inheritDoc} */ @Override public ExecutionContext<Object[]> build() { @@ -578,7 +591,7 @@ public class TestBuilders { Commons.parametersMap(dynamicParams), TxAttributes.fromTx(new NoOpTransaction(node.name())), SqlQueryProcessor.DEFAULT_TIME_ZONE_ID, - null + queryCancel ); } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java index 7cbdc2ab80..75d4a8ddbd 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java @@ -43,6 +43,7 @@ import org.apache.ignite.internal.testframework.WithSystemProperty; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.type.NativeTypes; +import org.apache.ignite.lang.CancellationToken; import org.apache.ignite.sql.ColumnMetadata; import org.apache.ignite.sql.ColumnType; import org.jetbrains.annotations.Nullable; @@ -304,8 +305,12 @@ public class QueryCheckerTest extends BaseIgniteAbstractTest { } @Override - public CompletableFuture<QueryMetadata> prepareSingleAsync(SqlProperties properties, - @Nullable InternalTransaction transaction, String qry, Object... params) { + public CompletableFuture<QueryMetadata> prepareSingleAsync( + SqlProperties properties, + @Nullable InternalTransaction transaction, + String qry, + Object... params + ) { assert params == null || params.length == 0 : "params are not supported"; assert prepareOnly : "Expected that the query will be executed"; @@ -319,6 +324,7 @@ public class QueryCheckerTest extends BaseIgniteAbstractTest { SqlProperties properties, HybridTimestampTracker observableTimeTracker, @Nullable InternalTransaction transaction, + @Nullable CancellationToken cancellationToken, String qry, Object... params ) { diff --git a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerImpl.java b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerImpl.java index 56dd52f50b..2c7ec723e2 100644 --- a/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerImpl.java +++ b/modules/sql-engine/src/testFixtures/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerImpl.java @@ -327,7 +327,13 @@ abstract class QueryCheckerImpl implements QueryChecker { if (!CollectionUtils.nullOrEmpty(planMatchers)) { CompletableFuture<AsyncSqlCursor<InternalSqlRow>> explainCursors = qryProc.queryAsync( - properties, observableTimeTracker(), tx, "EXPLAIN PLAN FOR " + qry, params); + properties, + observableTimeTracker(), + tx, + null, + "EXPLAIN PLAN FOR " + qry, + params + ); AsyncSqlCursor<InternalSqlRow> explainCursor = await(explainCursors); List<InternalSqlRow> explainRes = getAllFromCursor(explainCursor); @@ -353,7 +359,7 @@ abstract class QueryCheckerImpl implements QueryChecker { // Check result. CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursors = - bypassingThreadAssertionsAsync(() -> qryProc.queryAsync(properties, observableTimeTracker(), tx, qry, params)); + bypassingThreadAssertionsAsync(() -> qryProc.queryAsync(properties, observableTimeTracker(), tx, null, qry, params)); AsyncSqlCursor<InternalSqlRow> cur = await(cursors);