This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ef37739b38f IGNITE-24006 Sql. AssertionError when running
ddlInsideExplicitTransactionFails test (#6889)
ef37739b38f is described below
commit ef37739b38f04c9c7a8560f30a8c05f10a43df83
Author: korlov42 <[email protected]>
AuthorDate: Fri Nov 7 16:03:16 2025 +0200
IGNITE-24006 Sql. AssertionError when running
ddlInsideExplicitTransactionFails test (#6889)
---
.../ignite/client/handler/JdbcHandlerBase.java | 5 +-
.../internal/sql/engine/ItCancelQueryTest.java | 5 +-
.../internal/sql/engine/ItCancelScriptTest.java | 2 +-
.../sql/engine/ItSqlMultiStatementTxTest.java | 14 ++-
.../sql/engine/exec/fsm/ExecutionPhase.java | 2 +-
.../sql/engine/exec/fsm/MultiStatementHandler.java | 5 +-
.../internal/sql/engine/exec/fsm/Program.java | 80 +++++++++++------
...aseHandler.java => ProgramExecutionHandle.java} | 21 +++--
.../sql/engine/exec/fsm/ProgramExecutionState.java | 58 +++++++++++++
.../ignite/internal/sql/engine/exec/fsm/Query.java | 99 ++++++++++++++++++++--
.../sql/engine/exec/fsm/QueryExecutionProgram.java | 6 +-
.../sql/engine/exec/fsm/QueryExecutor.java | 32 ++++---
.../engine/util/SqlExceptionMapperProvider.java | 4 +
.../internal/sql/engine/exec/DdlBatchingTest.java | 5 +-
.../sql/engine/exec/QueryRecoveryTest.java | 6 +-
.../internal/sql/engine/exec/QueryTimeoutTest.java | 4 +-
16 files changed, 271 insertions(+), 77 deletions(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcHandlerBase.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcHandlerBase.java
index 0bd33c943ae..48f477babca 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcHandlerBase.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcHandlerBase.java
@@ -37,9 +37,10 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
-import
org.apache.ignite.internal.sql.engine.TxControlInsideExternalTxNotSupportedException;
import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.lang.ErrorGroups.Sql;
+import org.apache.ignite.lang.TraceableException;
import org.apache.ignite.sql.ColumnMetadata;
import org.apache.ignite.sql.ColumnType;
import org.apache.ignite.sql.ResultSetMetadata;
@@ -155,7 +156,7 @@ abstract class JdbcHandlerBase {
String errorMessage;
- if (ex instanceof TxControlInsideExternalTxNotSupportedException) {
+ if (ex instanceof TraceableException && ((TraceableException)
ex).code() == Sql.TX_CONTROL_INSIDE_EXTERNAL_TX_ERR) {
errorMessage = "Transaction control statements are not supported
when autocommit mode is disabled";
} else {
errorMessage = getErrorMessage(ex);
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCancelQueryTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCancelQueryTest.java
index e04df20db00..7c4209e0234 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCancelQueryTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCancelQueryTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.sql.engine;
import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.expectQueryCancelled;
-import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.expectQueryCancelledInternalException;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -170,7 +169,7 @@ public class ItCancelQueryTest extends
BaseSqlIntegrationTest {
"SELECT 1"
));
- expectQueryCancelledInternalException(run);
+ expectQueryCancelled(run);
waitUntilRunningQueriesCount(is(0));
}
@@ -209,7 +208,7 @@ public class ItCancelQueryTest extends
BaseSqlIntegrationTest {
CompletableFuture<Void> f = cancelHandle.cancelAsync();
// Obverse cancellation error
- expectQueryCancelledInternalException(run);
+ expectQueryCancelled(run);
await(f);
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCancelScriptTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCancelScriptTest.java
index 4f6c28b8859..19f8c93cc6f 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCancelScriptTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCancelScriptTest.java
@@ -124,7 +124,7 @@ public class ItCancelScriptTest extends
BaseSqlMultiStatementTest {
cancelHandle.cancel();
- expectQueryCancelledInternalException(
+ expectQueryCancelled(
() -> runScript(token, "SELECT 1; SELECT 2;")
);
}
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTxTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTxTest.java
index e3162e9752a..2faa0a8b0f2 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTxTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTxTest.java
@@ -30,7 +30,6 @@ import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -39,6 +38,7 @@ import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
+import org.apache.ignite.lang.ErrorGroups.Sql;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -351,12 +351,20 @@ public class ItSqlMultiStatementTxTest extends
BaseSqlMultiStatementTest {
@Test
void transactionControlStatementFailsWithExternalTransaction() {
InternalTransaction tx1 = (InternalTransaction) igniteTx().begin();
-
assertThrowsExactly(TxControlInsideExternalTxNotSupportedException.class, () ->
runScript(tx1, null, "COMMIT"));
+ assertThrowsSqlException(
+ Sql.TX_CONTROL_INSIDE_EXTERNAL_TX_ERR,
+ "Transaction control statement cannot be executed within an
external transaction.",
+ () -> runScript(tx1, null, "COMMIT")
+ );
assertEquals(0, txManager().pending());
assertEquals(TxState.ABORTED, tx1.state());
InternalTransaction tx2 = (InternalTransaction) igniteTx().begin();
-
assertThrowsExactly(TxControlInsideExternalTxNotSupportedException.class, () ->
runScript(tx2, null, "START TRANSACTION; COMMIT;"));
+ assertThrowsSqlException(
+ Sql.TX_CONTROL_INSIDE_EXTERNAL_TX_ERR,
+ "Transaction control statement cannot be executed within an
external transaction.",
+ () -> runScript(tx2, null, "START TRANSACTION; COMMIT;")
+ );
assertEquals(0, txManager().pending());
assertEquals(TxState.ABORTED, tx2.state());
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ExecutionPhase.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ExecutionPhase.java
index f9d949a2cc2..89d7816221d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ExecutionPhase.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ExecutionPhase.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.sql.engine.exec.fsm;
/** Enumerates possible phases of query execution. */
public enum ExecutionPhase {
/** Query is registered on server. */
- REGISTERED(RegisteredPhaseHandler.INSTANCE),
+ REGISTERED(NoOpHandler.INSTANCE),
/** Query string is parsed at the moment. Parsed AST may or may not be
available yet. */
PARSING(ParsingPhaseHandler.INSTANCE),
/** AST is available now, optimization task is submitted. */
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/MultiStatementHandler.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/MultiStatementHandler.java
index 33558d59bfa..5b728a0d876 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/MultiStatementHandler.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/MultiStatementHandler.java
@@ -214,8 +214,7 @@ class MultiStatementHandler {
if (lastStatement) {
// Main program is completed, therefore it's safe to
schedule termination of a query
- query.resultHolder
- .thenRun(this::scheduleTermination);
+ scheduleTermination();
} else {
CompletableFuture<Void> triggerFuture;
ScriptStatement nextStatement = statements.peek();
@@ -300,7 +299,7 @@ class MultiStatementHandler {
private void scheduleTermination() {
CompletableFuture.allOf(dependentQueries.toArray(CompletableFuture[]::new))
- .whenComplete((ignored, ex) ->
query.moveTo(ExecutionPhase.TERMINATED));
+ .whenComplete((ignored, ex) -> query.terminate());
}
private static class ScriptStatement {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Program.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Program.java
index 1996f70cd98..fc41f0fb4a3 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Program.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Program.java
@@ -28,7 +28,6 @@ import java.util.stream.Collectors;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.sql.engine.exec.fsm.Result.Status;
-import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.util.ExceptionUtils;
/**
@@ -65,7 +64,11 @@ class Program<ResultT> {
this.errorHandler = errorHandler;
}
- CompletableFuture<ResultT> run(Query query) {
+ ProgramExecutionState<ResultT> createState() {
+ return new ProgramExecutionState<>(name);
+ }
+
+ void run(Query query, ProgramExecutionState<ResultT> state) {
Result result;
do {
ExecutionPhase phase = query.currentPhase();
@@ -75,17 +78,14 @@ class Program<ResultT> {
} catch (Throwable th) {
// handles exception from synchronous part of phase evaluation
- try {
- if (errorHandler.test(query, th)) {
- continue;
- }
- } catch (AssertionError | Exception ex) {
- LOG.warn("Exception in error handler [queryId={}]", ex,
query.id);
-
- query.onError(th);
+ if (shouldRetry(query, th)) {
+ continue;
}
- return Commons.cast(query.resultHolder);
+ query.setError(th);
+ finalizeActiveProgram(query, state);
+
+ return;
}
if (result.status() == Status.WAITING_FOR_COMPLETION) {
@@ -101,31 +101,55 @@ class Program<ResultT> {
ex = ExceptionUtils.unwrapCause(ex);
// handles exception from asynchronous
part of phase evaluation
- try {
- if (errorHandler.test(query, ex)) {
- query.executor.execute(() ->
run(query));
- }
- } catch (AssertionError | Exception ex0) {
- LOG.warn("Exception in error handler
[queryId={}]", ex0, query.id);
-
- query.onError(ex);
+ if (shouldRetry(query, ex)) {
+ query.executor.execute(() ->
run(query, state));
+ } else {
+ query.setError(ex);
+ finalizeActiveProgram(query, state);
}
return;
}
query.executor.execute(() -> {
- if (advanceQuery(query)) {
- run(query);
+ if (advanceQuery(query, state)) {
+ run(query, state);
}
});
});
break;
}
}
- } while (advanceQuery(query));
+ } while (advanceQuery(query, state));
+ }
+
+ private boolean shouldRetry(Query query, Throwable th) {
+ try {
+ if (errorHandler.test(query, th)) {
+ return true;
+ }
+ } catch (Throwable throwableFromErrorHandler) {
+ LOG.warn("Exception in error handler [queryId={}]",
throwableFromErrorHandler, query.id);
+
+ query.terminateExceptionally(th);
+ }
- return Commons.cast(query.resultHolder);
+ return false;
+ }
+
+ private static void finalizeActiveProgram(Query query,
ProgramExecutionState<?> executionState) {
+ ProgramExecutionHandle activeHandle =
query.activeProgram.getAndSet(null);
+
+ Throwable throwable = query.error.get();
+ if (throwable != null) {
+ // Set error as result of execution.
+ executionState.notifyError(throwable);
+
+ query.terminate();
+ }
+
+ executionState.programFinished.complete(null);
+ assert activeHandle == executionState;
}
/**
@@ -134,7 +158,7 @@ class Program<ResultT> {
* @param query Query to advance.
* @return {@code true} if new state is not terminal (e.g. it does make
sense to continue execution).
*/
- private boolean advanceQuery(Query query) {
+ private boolean advanceQuery(Query query, ProgramExecutionState<ResultT>
state) {
ExecutionPhase phase = query.currentPhase();
Transition transition = transitions.get(phase);
@@ -146,7 +170,13 @@ class Program<ResultT> {
if (terminalPhase.test(query.currentPhase())) {
ResultT result = this.result.apply(query);
- query.resultHolder.complete(result);
+ finalizeActiveProgram(query, state);
+
+ if (!state.resultHolder.complete(result)) {
+ assert state.resultHolder.isCompletedExceptionally();
+
+ query.moveTo(ExecutionPhase.TERMINATED);
+ }
return false;
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/RegisteredPhaseHandler.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ProgramExecutionHandle.java
similarity index 63%
rename from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/RegisteredPhaseHandler.java
rename to
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ProgramExecutionHandle.java
index 39d6bf124ee..1e979cce51e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/RegisteredPhaseHandler.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ProgramExecutionHandle.java
@@ -17,14 +17,19 @@
package org.apache.ignite.internal.sql.engine.exec.fsm;
-/** Handler that kick-starts query processing. */
-class RegisteredPhaseHandler implements ExecutionPhaseHandler {
- static final ExecutionPhaseHandler INSTANCE = new RegisteredPhaseHandler();
+import java.util.concurrent.CompletableFuture;
- private RegisteredPhaseHandler() { }
+/**
+ * Provides minimal API to communicate with a running {@link Program program}.
+ */
+interface ProgramExecutionHandle {
+ /**
+ * Notifies program execution about exception related to query this
program is running for.
+ *
+ * @param error An error to notify about.
+ */
+ void notifyError(Throwable error);
- @Override
- public Result handle(Query query) {
- return Result.completed();
- }
+ /** Returns a future which will be completed successfully. */
+ CompletableFuture<Void> completionFuture();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ProgramExecutionState.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ProgramExecutionState.java
new file mode 100644
index 00000000000..168988cb57c
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ProgramExecutionState.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.fsm;
+
+import static
org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Represents a state of program execution.
+ *
+ * <p>That is, strictly typed holder of an execution result.
+ *
+ * @param <ResultT> A type of the execution result.
+ */
+class ProgramExecutionState<ResultT> implements ProgramExecutionHandle {
+ final CompletableFuture<Void> programFinished = new CompletableFuture<>();
+ final CompletableFuture<ResultT> resultHolder = new CompletableFuture<>();
+
+ @SuppressWarnings("FieldCanBeLocal") // Used in toString()
+ private final String programName;
+
+ ProgramExecutionState(String programName) {
+ this.programName = programName;
+ }
+
+ @Override
+ public void notifyError(Throwable th) {
+
resultHolder.completeExceptionally(mapToPublicSqlException(unwrapCause(th)));
+ }
+
+ @Override
+ public CompletableFuture<Void> completionFuture() {
+ return programFinished;
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
index 57072002986..23bae293b0e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
@@ -17,11 +17,15 @@
package org.apache.ignite.internal.sql.engine.exec.fsm;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+
import java.time.Instant;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.QueryCancel;
@@ -31,6 +35,8 @@ import
org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
+import org.apache.ignite.lang.ErrorGroups.Common;
+import org.apache.ignite.sql.SqlException;
import org.jetbrains.annotations.Nullable;
/**
@@ -39,8 +45,11 @@ import org.jetbrains.annotations.Nullable;
* <p>Encapsulates intermediate state populated throughout query lifecycle.
*/
class Query {
+ private static final IgniteLogger LOG = Loggers.forClass(Query.class);
+ private static final int MAX_ATTEMPTS_COUNT = 1024;
+
final CompletableFuture<Void> terminationFuture = new
CompletableFuture<>();
- volatile CompletableFuture<Object> resultHolder = new
CompletableFuture<>();
+ final AtomicReference<@Nullable ProgramExecutionHandle> activeProgram =
new AtomicReference<>();
// Below are attributes the query was initialized with
final Instant createdAt;
@@ -116,6 +125,69 @@ class Query {
this.parsedResult = parsedResult;
}
+ <ResultT> CompletableFuture<ResultT> runProgram(Program<ResultT> program) {
+ ProgramExecutionState<ResultT> state = program.createState();
+
+ ProgramExecutionHandle currentProgram =
activeProgram.compareAndExchange(null, state);
+ if (currentProgram != null) {
+ String message = format(
+ "Attempt to run query program while another is still
active [runningProgram={}, newProgram={}].",
+ currentProgram, program
+ );
+ throw new SqlException(Common.INTERNAL_ERR, message);
+ }
+
+ program.run(this, state);
+
+ return state.resultHolder;
+ }
+
+ /**
+ * Initiates a graceful termination of the current query.
+ *
+ * <p>If the query is idle, moves it to {@link ExecutionPhase#TERMINATED}
phase immediately. Otherwise, the method waits for the
+ * currently active program to complete before proceeding. Note that
termination may not take effect immediately upon return. To wait
+ * for the actual completion of termination, use {@link
#terminationFuture}.
+ */
+ void terminate() {
+ tryTerminate(1);
+ }
+
+ private void tryTerminate(int attemptNo) {
+ if (attemptNo >= MAX_ATTEMPTS_COUNT) {
+ // Exception thrown from this place most probably will be
swallowed,
+ // therefore logging is chosen.
+ LOG.warn("Unable to terminate query after several attempts. Try to
cancel it explicitly using KILL statement,"
+ + "or restart the node as some resources mays still be
held by this query [queryId={}, attempts={}].", id, attemptNo);
+
+ return;
+ }
+
+ // Already terminated, nothing to do.
+ if (currentPhase == ExecutionPhase.TERMINATED) {
+ return;
+ }
+
+ // Optimistically assume there is no active program, just try to run
TERMINATION program.
+ if (tryRunTerminationProgram()) {
+ return;
+ }
+
+ ProgramExecutionHandle handle = activeProgram.get();
+
+ // Handle may be null if active program completes a moment after we
tried to run TERMINATION program.
+ if (handle != null) {
+ // If active program still exists, wait for completion and try
again.
+ handle.completionFuture()
+ .whenComplete((r, e) -> tryTerminate(attemptNo + 1));
+
+ return;
+ }
+
+ // Active program competed concurrently, just try again.
+ tryTerminate(attemptNo + 1);
+ }
+
/** Moves the query to a given state. */
void moveTo(ExecutionPhase newPhase) {
currentPhase = newPhase;
@@ -129,12 +201,30 @@ class Query {
return currentPhase;
}
- void onError(Throwable th) {
+ void terminateExceptionally(Throwable th) {
setError(th);
- moveTo(ExecutionPhase.TERMINATED);
+ ProgramExecutionHandle handle = activeProgram.get();
+ if (handle != null) {
+ handle.notifyError(th);
+ }
+
+ terminate();
+ }
+
+ private boolean tryRunTerminationProgram() {
+ // Create fake state to prevent concurrent program execution.
+ ProgramExecutionState<Void> state = new
ProgramExecutionState<>("QUERY_TERMINATION");
+
+ if (activeProgram.compareAndSet(null, state)) {
+ moveTo(ExecutionPhase.TERMINATED);
+
+ activeProgram.set(null);
+
+ return true;
+ }
- resultHolder.completeExceptionally(th);
+ return false;
}
void setError(Throwable err) {
@@ -157,7 +247,6 @@ class Query {
}
void reset() {
- resultHolder = new CompletableFuture<>();
error.set(null);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
index db6bcfc0f14..d16d4904c30 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
@@ -84,13 +84,11 @@ class QueryExecutionProgram extends
Program<AsyncSqlCursor<InternalSqlRow>> {
context.excludeNode(exception.nodeName());
- return true;
- } else if (lockConflict(th) || replicaMiss(th) ||
groupOverloaded(th)) {
return true;
}
- }
- query.onError(th);
+ return lockConflict(th) || replicaMiss(th) || groupOverloaded(th);
+ }
return false;
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
index c7fba380251..aca00d1c28d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
@@ -202,10 +202,10 @@ public class QueryExecutor implements LifecycleAware,
Debuggable {
query.cancel.setTimeout(scheduler, queryTimeout);
}
- return Programs.QUERY_EXECUTION.run(query)
+ return query.runProgram(Programs.QUERY_EXECUTION)
.whenComplete((cursor, ex) -> {
if (cursor != null && query.parsedScript == null) {
- cursor.onClose().thenRun(() ->
query.moveTo(ExecutionPhase.TERMINATED));
+ cursor.onClose().thenRun(query::terminate);
}
});
}
@@ -242,15 +242,17 @@ public class QueryExecutor implements LifecycleAware,
Debuggable {
try {
parent.cancel.attach(query.cancel);
} catch (QueryCancelledException ex) {
- query.moveTo(ExecutionPhase.TERMINATED);
+ query.terminate();
return failedFuture(ex);
}
- return Programs.SCRIPT_ITEM_EXECUTION.run(query)
+ return query.runProgram(Programs.SCRIPT_ITEM_EXECUTION)
.whenComplete((cursor, ex) -> {
if (cursor != null) {
- cursor.onClose().thenRun(() ->
query.moveTo(ExecutionPhase.TERMINATED));
+ cursor.onClose().thenRun(query::terminate);
+ } else if (ex != null) {
+ query.terminate();
}
});
}
@@ -300,7 +302,7 @@ public class QueryExecutor implements LifecycleAware,
Debuggable {
parent.cancel.attach(query.cancel);
}
} catch (QueryCancelledException ex) {
- queries.forEach(query -> query.onError(ex));
+ queries.forEach(query -> query.terminateExceptionally(ex));
return failedFuture(ex);
} finally {
@@ -309,7 +311,7 @@ public class QueryExecutor implements LifecycleAware,
Debuggable {
List<CompletableFuture<?>> preparedQueryFutures = new
ArrayList<>(batch.size());
for (Query query : queries) {
-
preparedQueryFutures.add(Programs.SCRIPT_ITEM_PREPARATION.run(query));
+
preparedQueryFutures.add(query.runProgram(Programs.SCRIPT_ITEM_PREPARATION));
}
return CompletableFutures.allOf(preparedQueryFutures)
@@ -340,7 +342,7 @@ public class QueryExecutor implements LifecycleAware,
Debuggable {
assert th != null;
while (it.hasNext()) {
- it.next().onError(th);
+ it.next().terminateExceptionally(th);
}
return failedFuture(th);
@@ -359,7 +361,7 @@ public class QueryExecutor implements LifecycleAware,
Debuggable {
Throwable th = null;
for (Query query : queries) {
if (th != null) {
- query.onError(th);
+ query.terminateExceptionally(th);
continue;
}
@@ -395,7 +397,7 @@ public class QueryExecutor implements LifecycleAware,
Debuggable {
firstCursor = currentCursor;
}
- currentCursor.onClose().thenRun(() ->
query.moveTo(ExecutionPhase.TERMINATED));
+
currentCursor.onClose().thenRun(query::terminate);
}
return completedFuture(firstCursor);
@@ -418,10 +420,10 @@ public class QueryExecutor implements LifecycleAware,
Debuggable {
CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture =
lastStep
.whenComplete((none, ex) -> {
if (ex != null) {
- query.onError(ex);
+ query.terminateExceptionally(ex);
}
})
- .thenCompose(none ->
Programs.SCRIPT_ITEM_EXECUTION.run(query)
+ .thenCompose(none ->
query.runProgram(Programs.SCRIPT_ITEM_EXECUTION)
.whenComplete((cursor, ex) -> {
if (cursorRef0 != null) {
if (cursor != null) {
@@ -432,7 +434,9 @@ public class QueryExecutor implements LifecycleAware,
Debuggable {
}
if (cursor != null) {
- cursor.onClose().thenRun(() ->
query.moveTo(ExecutionPhase.TERMINATED));
+ cursor.onClose().thenRun(query::terminate);
+ } else if (ex != null) {
+ query.terminate();
}
}));
@@ -609,7 +613,7 @@ public class QueryExecutor implements LifecycleAware,
Debuggable {
Exception ex = new NodeStoppingException();
- runningQueries.values().forEach(query -> query.onError(ex));
+ runningQueries.values().forEach(query ->
query.terminateExceptionally(ex));
}
static class ParsedResultWithNextCursorFuture {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SqlExceptionMapperProvider.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SqlExceptionMapperProvider.java
index 1508984a20f..ad1fee12f6c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SqlExceptionMapperProvider.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SqlExceptionMapperProvider.java
@@ -31,6 +31,7 @@ import
org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.lang.IgniteExceptionMapper;
import org.apache.ignite.internal.lang.IgniteExceptionMappersProvider;
import org.apache.ignite.internal.sql.engine.QueryCancelledException;
+import
org.apache.ignite.internal.sql.engine.TxControlInsideExternalTxNotSupportedException;
import
org.apache.ignite.internal.sql.engine.exec.RemoteFragmentExecutionException;
import org.apache.ignite.lang.ErrorGroups.Common;
import org.apache.ignite.lang.IgniteException;
@@ -76,6 +77,9 @@ public class SqlExceptionMapperProvider implements
IgniteExceptionMappersProvide
mappers.add(unchecked(InternalCompilerException.class,
err -> new SqlException(Common.INTERNAL_ERR, "Expression
compiler error. " + err.getMessage(), err)));
+
mappers.add(unchecked(TxControlInsideExternalTxNotSupportedException.class,
+ err -> new SqlException(err.traceId(), err.code(),
err.getMessage(), err)));
+
return mappers;
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/DdlBatchingTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/DdlBatchingTest.java
index 9c3e2917509..bde3b3964cd 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/DdlBatchingTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/DdlBatchingTest.java
@@ -35,7 +35,6 @@ import java.util.stream.Collectors;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogService;
-import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.exec.fsm.QueryInfo;
@@ -374,7 +373,7 @@ public class DdlBatchingTest extends BaseIgniteAbstractTest
{
assertDdlResult(cursor, true);
assertThat(cursor.hasNextResult(), is(true));
assertThat(cursor.nextResult(), willThrowFast(
- CatalogValidationException.class,
+ SqlException.class,
"Table with name 'PUBLIC.T1' already exists"
));
@@ -475,7 +474,7 @@ public class DdlBatchingTest extends BaseIgniteAbstractTest
{
assertDdlResult(cursor, true);
assertThat(cursor.hasNextResult(), is(true));
assertThat(cursor.nextResult(), willThrowFast(
- CatalogValidationException.class,
+ SqlException.class,
"Table with name 'PUBLIC.T1' already exists"
));
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRecoveryTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRecoveryTest.java
index 3f90c019be0..c9aa1ba3c82 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRecoveryTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRecoveryTest.java
@@ -43,7 +43,6 @@ import
org.apache.ignite.internal.sql.engine.framework.NoOpTransactionalOperatio
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
import org.apache.ignite.internal.sql.engine.framework.TestCluster;
import org.apache.ignite.internal.sql.engine.framework.TestNode;
-import org.apache.ignite.internal.sql.engine.message.UnknownNodeException;
import org.apache.ignite.internal.sql.engine.prepare.QueryMetadata;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
@@ -57,6 +56,7 @@ import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.LockException;
import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.sql.SqlException;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
@@ -182,9 +182,9 @@ public class QueryRecoveryTest extends
BaseIgniteAbstractTest {
cluster.node(firstExpectedNode).disconnect();
assertThrows(
- UnknownNodeException.class,
+ SqlException.class,
() -> assertQuery(gatewayNode, "SELECT node FROM t1 WHERE
part_id = 0", txContext).check(),
- "Unknown node: " + firstExpectedNode
+ "Node left the cluster. Node: " + firstExpectedNode
);
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryTimeoutTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryTimeoutTest.java
index 744ef0323c6..d12cc3cb8f5 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryTimeoutTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryTimeoutTest.java
@@ -117,7 +117,7 @@ public class QueryTimeoutTest extends
BaseIgniteAbstractTest {
ignoreCatalogUpdates.set(true);
assertThrows(
- QueryCancelledException.class,
+ SqlException.class,
() -> gatewayNode.executeQuery(PROPS_WITH_TIMEOUT, "CREATE
TABLE x (id INTEGER PRIMARY KEY, val INTEGER)"),
QueryCancelledException.TIMEOUT_MSG
);
@@ -126,7 +126,7 @@ public class QueryTimeoutTest extends
BaseIgniteAbstractTest {
@Test
void testTimeoutKill() {
assertThrows(
- QueryCancelledException.class,
+ SqlException.class,
() -> gatewayNode.executeQuery(PROPS_WITH_TIMEOUT, "KILL QUERY
'" + randomUUID() + '\''),
QueryCancelledException.TIMEOUT_MSG
);