This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ef37739b38f IGNITE-24006 Sql. AssertionError when running 
ddlInsideExplicitTransactionFails test (#6889)
ef37739b38f is described below

commit ef37739b38f04c9c7a8560f30a8c05f10a43df83
Author: korlov42 <[email protected]>
AuthorDate: Fri Nov 7 16:03:16 2025 +0200

    IGNITE-24006 Sql. AssertionError when running 
ddlInsideExplicitTransactionFails test (#6889)
---
 .../ignite/client/handler/JdbcHandlerBase.java     |  5 +-
 .../internal/sql/engine/ItCancelQueryTest.java     |  5 +-
 .../internal/sql/engine/ItCancelScriptTest.java    |  2 +-
 .../sql/engine/ItSqlMultiStatementTxTest.java      | 14 ++-
 .../sql/engine/exec/fsm/ExecutionPhase.java        |  2 +-
 .../sql/engine/exec/fsm/MultiStatementHandler.java |  5 +-
 .../internal/sql/engine/exec/fsm/Program.java      | 80 +++++++++++------
 ...aseHandler.java => ProgramExecutionHandle.java} | 21 +++--
 .../sql/engine/exec/fsm/ProgramExecutionState.java | 58 +++++++++++++
 .../ignite/internal/sql/engine/exec/fsm/Query.java | 99 ++++++++++++++++++++--
 .../sql/engine/exec/fsm/QueryExecutionProgram.java |  6 +-
 .../sql/engine/exec/fsm/QueryExecutor.java         | 32 ++++---
 .../engine/util/SqlExceptionMapperProvider.java    |  4 +
 .../internal/sql/engine/exec/DdlBatchingTest.java  |  5 +-
 .../sql/engine/exec/QueryRecoveryTest.java         |  6 +-
 .../internal/sql/engine/exec/QueryTimeoutTest.java |  4 +-
 16 files changed, 271 insertions(+), 77 deletions(-)

diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcHandlerBase.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcHandlerBase.java
index 0bd33c943ae..48f477babca 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcHandlerBase.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcHandlerBase.java
@@ -37,9 +37,10 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
 import org.apache.ignite.internal.sql.engine.InternalSqlRow;
 import org.apache.ignite.internal.sql.engine.SqlQueryType;
-import 
org.apache.ignite.internal.sql.engine.TxControlInsideExternalTxNotSupportedException;
 import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
 import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.lang.ErrorGroups.Sql;
+import org.apache.ignite.lang.TraceableException;
 import org.apache.ignite.sql.ColumnMetadata;
 import org.apache.ignite.sql.ColumnType;
 import org.apache.ignite.sql.ResultSetMetadata;
@@ -155,7 +156,7 @@ abstract class JdbcHandlerBase {
 
         String errorMessage;
 
-        if (ex instanceof TxControlInsideExternalTxNotSupportedException) {
+        if (ex instanceof TraceableException && ((TraceableException) 
ex).code() == Sql.TX_CONTROL_INSIDE_EXTERNAL_TX_ERR) {
             errorMessage = "Transaction control statements are not supported 
when autocommit mode is disabled";
         } else {
             errorMessage = getErrorMessage(ex);
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCancelQueryTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCancelQueryTest.java
index e04df20db00..7c4209e0234 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCancelQueryTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCancelQueryTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.sql.engine;
 
 import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.expectQueryCancelled;
-import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.expectQueryCancelledInternalException;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -170,7 +169,7 @@ public class ItCancelQueryTest extends 
BaseSqlIntegrationTest {
                 "SELECT 1"
         ));
 
-        expectQueryCancelledInternalException(run);
+        expectQueryCancelled(run);
 
         waitUntilRunningQueriesCount(is(0));
     }
@@ -209,7 +208,7 @@ public class ItCancelQueryTest extends 
BaseSqlIntegrationTest {
         CompletableFuture<Void> f = cancelHandle.cancelAsync();
 
         // Obverse cancellation error
-        expectQueryCancelledInternalException(run);
+        expectQueryCancelled(run);
 
         await(f);
 
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCancelScriptTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCancelScriptTest.java
index 4f6c28b8859..19f8c93cc6f 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCancelScriptTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCancelScriptTest.java
@@ -124,7 +124,7 @@ public class ItCancelScriptTest extends 
BaseSqlMultiStatementTest {
 
         cancelHandle.cancel();
 
-        expectQueryCancelledInternalException(
+        expectQueryCancelled(
                 () -> runScript(token, "SELECT 1; SELECT 2;")
         );
     }
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTxTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTxTest.java
index e3162e9752a..2faa0a8b0f2 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTxTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTxTest.java
@@ -30,7 +30,6 @@ import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -39,6 +38,7 @@ import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
+import org.apache.ignite.lang.ErrorGroups.Sql;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
@@ -351,12 +351,20 @@ public class ItSqlMultiStatementTxTest extends 
BaseSqlMultiStatementTest {
     @Test
     void transactionControlStatementFailsWithExternalTransaction() {
         InternalTransaction tx1 = (InternalTransaction) igniteTx().begin();
-        
assertThrowsExactly(TxControlInsideExternalTxNotSupportedException.class, () -> 
runScript(tx1, null, "COMMIT"));
+        assertThrowsSqlException(
+                Sql.TX_CONTROL_INSIDE_EXTERNAL_TX_ERR,
+                "Transaction control statement cannot be executed within an 
external transaction.",
+                () -> runScript(tx1, null, "COMMIT")
+        );
         assertEquals(0, txManager().pending());
         assertEquals(TxState.ABORTED, tx1.state());
 
         InternalTransaction tx2 = (InternalTransaction) igniteTx().begin();
-        
assertThrowsExactly(TxControlInsideExternalTxNotSupportedException.class, () -> 
runScript(tx2, null, "START TRANSACTION; COMMIT;"));
+        assertThrowsSqlException(
+                Sql.TX_CONTROL_INSIDE_EXTERNAL_TX_ERR,
+                "Transaction control statement cannot be executed within an 
external transaction.",
+                () -> runScript(tx2, null, "START TRANSACTION; COMMIT;")
+        );
         assertEquals(0, txManager().pending());
         assertEquals(TxState.ABORTED, tx2.state());
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ExecutionPhase.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ExecutionPhase.java
index f9d949a2cc2..89d7816221d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ExecutionPhase.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ExecutionPhase.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.sql.engine.exec.fsm;
 /** Enumerates possible phases of query execution. */
 public enum ExecutionPhase {
     /** Query is registered on server. */
-    REGISTERED(RegisteredPhaseHandler.INSTANCE),
+    REGISTERED(NoOpHandler.INSTANCE),
     /** Query string is parsed at the moment. Parsed AST may or may not be 
available yet. */
     PARSING(ParsingPhaseHandler.INSTANCE),
     /** AST is available now, optimization task is submitted. */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/MultiStatementHandler.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/MultiStatementHandler.java
index 33558d59bfa..5b728a0d876 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/MultiStatementHandler.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/MultiStatementHandler.java
@@ -214,8 +214,7 @@ class MultiStatementHandler {
 
                 if (lastStatement) {
                     // Main program is completed, therefore it's safe to 
schedule termination of a query
-                    query.resultHolder
-                            .thenRun(this::scheduleTermination);
+                    scheduleTermination();
                 } else {
                     CompletableFuture<Void> triggerFuture;
                     ScriptStatement nextStatement = statements.peek();
@@ -300,7 +299,7 @@ class MultiStatementHandler {
 
     private void scheduleTermination() {
         
CompletableFuture.allOf(dependentQueries.toArray(CompletableFuture[]::new))
-                .whenComplete((ignored, ex) -> 
query.moveTo(ExecutionPhase.TERMINATED));
+                .whenComplete((ignored, ex) -> query.terminate());
     }
 
     private static class ScriptStatement {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Program.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Program.java
index 1996f70cd98..fc41f0fb4a3 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Program.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Program.java
@@ -28,7 +28,6 @@ import java.util.stream.Collectors;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.sql.engine.exec.fsm.Result.Status;
-import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.util.ExceptionUtils;
 
 /**
@@ -65,7 +64,11 @@ class Program<ResultT> {
         this.errorHandler = errorHandler;
     }
 
-    CompletableFuture<ResultT> run(Query query) {
+    ProgramExecutionState<ResultT> createState() {
+        return new ProgramExecutionState<>(name);
+    }
+
+    void run(Query query, ProgramExecutionState<ResultT> state) {
         Result result;
         do {
             ExecutionPhase phase = query.currentPhase();
@@ -75,17 +78,14 @@ class Program<ResultT> {
             } catch (Throwable th) {
                 // handles exception from synchronous part of phase evaluation
 
-                try {
-                    if (errorHandler.test(query, th)) {
-                        continue;
-                    }
-                } catch (AssertionError | Exception ex) {
-                    LOG.warn("Exception in error handler [queryId={}]", ex, 
query.id);
-
-                    query.onError(th);
+                if (shouldRetry(query, th)) {
+                    continue;
                 }
 
-                return Commons.cast(query.resultHolder);
+                query.setError(th);
+                finalizeActiveProgram(query, state);
+
+                return;
             }
 
             if (result.status() == Status.WAITING_FOR_COMPLETION) {
@@ -101,31 +101,55 @@ class Program<ResultT> {
                                     ex = ExceptionUtils.unwrapCause(ex);
 
                                     // handles exception from asynchronous 
part of phase evaluation
-                                    try {
-                                        if (errorHandler.test(query, ex)) {
-                                            query.executor.execute(() -> 
run(query));
-                                        }
-                                    } catch (AssertionError | Exception ex0) {
-                                        LOG.warn("Exception in error handler 
[queryId={}]", ex0, query.id);
-
-                                        query.onError(ex);
+                                    if (shouldRetry(query, ex)) {
+                                        query.executor.execute(() -> 
run(query, state));
+                                    } else {
+                                        query.setError(ex);
+                                        finalizeActiveProgram(query, state);
                                     }
 
                                     return;
                                 }
 
                                 query.executor.execute(() -> {
-                                    if (advanceQuery(query)) {
-                                        run(query);
+                                    if (advanceQuery(query, state)) {
+                                        run(query, state);
                                     }
                                 });
                             });
                     break;
                 }
             }
-        } while (advanceQuery(query));
+        } while (advanceQuery(query, state));
+    }
+
+    private boolean shouldRetry(Query query, Throwable th) {
+        try {
+            if (errorHandler.test(query, th)) {
+                return true;
+            }
+        } catch (Throwable throwableFromErrorHandler) {
+            LOG.warn("Exception in error handler [queryId={}]", 
throwableFromErrorHandler, query.id);
+
+            query.terminateExceptionally(th);
+        }
 
-        return Commons.cast(query.resultHolder);
+        return false;
+    }
+
+    private static void finalizeActiveProgram(Query query, 
ProgramExecutionState<?> executionState) {
+        ProgramExecutionHandle activeHandle = 
query.activeProgram.getAndSet(null);
+
+        Throwable throwable = query.error.get();
+        if (throwable != null) {
+            // Set error as result of execution.
+            executionState.notifyError(throwable);
+
+            query.terminate();
+        }
+
+        executionState.programFinished.complete(null);
+        assert activeHandle == executionState;
     }
 
     /**
@@ -134,7 +158,7 @@ class Program<ResultT> {
      * @param query Query to advance.
      * @return {@code true} if new state is not terminal (e.g. it does make 
sense to continue execution).
      */
-    private boolean advanceQuery(Query query) {
+    private boolean advanceQuery(Query query, ProgramExecutionState<ResultT> 
state) {
         ExecutionPhase phase = query.currentPhase();
 
         Transition transition = transitions.get(phase);
@@ -146,7 +170,13 @@ class Program<ResultT> {
         if (terminalPhase.test(query.currentPhase())) {
             ResultT result = this.result.apply(query);
 
-            query.resultHolder.complete(result);
+            finalizeActiveProgram(query, state);
+
+            if (!state.resultHolder.complete(result)) {
+                assert state.resultHolder.isCompletedExceptionally();
+
+                query.moveTo(ExecutionPhase.TERMINATED);
+            }
 
             return false;
         }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/RegisteredPhaseHandler.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ProgramExecutionHandle.java
similarity index 63%
rename from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/RegisteredPhaseHandler.java
rename to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ProgramExecutionHandle.java
index 39d6bf124ee..1e979cce51e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/RegisteredPhaseHandler.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ProgramExecutionHandle.java
@@ -17,14 +17,19 @@
 
 package org.apache.ignite.internal.sql.engine.exec.fsm;
 
-/** Handler that kick-starts query processing. */
-class RegisteredPhaseHandler implements ExecutionPhaseHandler {
-    static final ExecutionPhaseHandler INSTANCE = new RegisteredPhaseHandler();
+import java.util.concurrent.CompletableFuture;
 
-    private RegisteredPhaseHandler() { }
+/**
+ * Provides minimal API to communicate with a running {@link Program program}.
+ */
+interface ProgramExecutionHandle {
+    /**
+     * Notifies program execution about exception related to query this 
program is running for.
+     *
+     * @param error An error to notify about.
+     */
+    void notifyError(Throwable error);
 
-    @Override
-    public Result handle(Query query) {
-        return Result.completed();
-    }
+    /** Returns a future which will be completed successfully. */
+    CompletableFuture<Void> completionFuture();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ProgramExecutionState.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ProgramExecutionState.java
new file mode 100644
index 00000000000..168988cb57c
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ProgramExecutionState.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.fsm;
+
+import static 
org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Represents a state of program execution.
+ *
+ * <p>That is, strictly typed holder of an execution result.
+ *
+ * @param <ResultT> A type of the execution result.
+ */
+class ProgramExecutionState<ResultT> implements ProgramExecutionHandle {
+    final CompletableFuture<Void> programFinished = new CompletableFuture<>();
+    final CompletableFuture<ResultT> resultHolder = new CompletableFuture<>();
+
+    @SuppressWarnings("FieldCanBeLocal") // Used in toString()
+    private final String programName;
+
+    ProgramExecutionState(String programName) {
+        this.programName = programName;
+    }
+
+    @Override
+    public void notifyError(Throwable th) {
+        
resultHolder.completeExceptionally(mapToPublicSqlException(unwrapCause(th)));
+    }
+
+    @Override
+    public CompletableFuture<Void> completionFuture() {
+        return programFinished;
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
index 57072002986..23bae293b0e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java
@@ -17,11 +17,15 @@
 
 package org.apache.ignite.internal.sql.engine.exec.fsm;
 
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+
 import java.time.Instant;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
 import org.apache.ignite.internal.sql.engine.InternalSqlRow;
 import org.apache.ignite.internal.sql.engine.QueryCancel;
@@ -31,6 +35,8 @@ import 
org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
 import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
+import org.apache.ignite.lang.ErrorGroups.Common;
+import org.apache.ignite.sql.SqlException;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -39,8 +45,11 @@ import org.jetbrains.annotations.Nullable;
  * <p>Encapsulates intermediate state populated throughout query lifecycle.
  */
 class Query {
+    private static final IgniteLogger LOG = Loggers.forClass(Query.class);
+    private static final int MAX_ATTEMPTS_COUNT = 1024;
+
     final CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
-    volatile CompletableFuture<Object> resultHolder = new 
CompletableFuture<>();
+    final AtomicReference<@Nullable ProgramExecutionHandle> activeProgram = 
new AtomicReference<>();
 
     // Below are attributes the query was initialized with
     final Instant createdAt;
@@ -116,6 +125,69 @@ class Query {
         this.parsedResult = parsedResult;
     }
 
+    <ResultT> CompletableFuture<ResultT> runProgram(Program<ResultT> program) {
+        ProgramExecutionState<ResultT> state = program.createState();
+
+        ProgramExecutionHandle currentProgram = 
activeProgram.compareAndExchange(null, state);
+        if (currentProgram != null) {
+            String message = format(
+                    "Attempt to run query program while another is still 
active [runningProgram={}, newProgram={}].",
+                    currentProgram, program
+            );
+            throw new SqlException(Common.INTERNAL_ERR, message);
+        }
+
+        program.run(this, state);
+
+        return state.resultHolder;
+    }
+
+    /**
+     * Initiates a graceful termination of the current query.
+     *
+     * <p>If the query is idle, moves it to {@link ExecutionPhase#TERMINATED} 
phase immediately. Otherwise, the method waits for the
+     * currently active program to complete before proceeding. Note that 
termination may not take effect immediately upon return. To wait
+     * for the actual completion of termination, use {@link 
#terminationFuture}.
+     */
+    void terminate() {
+        tryTerminate(1);
+    }
+
+    private void tryTerminate(int attemptNo) {
+        if (attemptNo >= MAX_ATTEMPTS_COUNT) {
+            // Exception thrown from this place most probably will be 
swallowed,
+            // therefore logging is chosen.
+            LOG.warn("Unable to terminate query after several attempts. Try to 
cancel it explicitly using KILL statement," 
+                    + "or restart the node as some resources mays still be 
held by this query [queryId={}, attempts={}].", id, attemptNo);
+
+            return;
+        }
+
+        // Already terminated, nothing to do.
+        if (currentPhase == ExecutionPhase.TERMINATED) {
+            return;
+        }
+
+        // Optimistically assume there is no active program, just try to run 
TERMINATION program.
+        if (tryRunTerminationProgram()) {
+            return;
+        }
+
+        ProgramExecutionHandle handle = activeProgram.get();
+
+        // Handle may be null if active program completes a moment after we 
tried to run TERMINATION program.
+        if (handle != null) {
+            // If active program still exists, wait for completion and try 
again.
+            handle.completionFuture()
+                    .whenComplete((r, e) -> tryTerminate(attemptNo + 1));
+
+            return;
+        }
+
+        // Active program competed concurrently, just try again.
+        tryTerminate(attemptNo + 1);
+    }
+
     /** Moves the query to a given state. */
     void moveTo(ExecutionPhase newPhase) {
         currentPhase = newPhase;
@@ -129,12 +201,30 @@ class Query {
         return currentPhase;
     }
 
-    void onError(Throwable th) {
+    void terminateExceptionally(Throwable th) {
         setError(th);
 
-        moveTo(ExecutionPhase.TERMINATED);
+        ProgramExecutionHandle handle = activeProgram.get();
+        if (handle != null) {
+            handle.notifyError(th);
+        }
+
+        terminate();
+    }
+
+    private boolean tryRunTerminationProgram() {
+        // Create fake state to prevent concurrent program execution.
+        ProgramExecutionState<Void> state = new 
ProgramExecutionState<>("QUERY_TERMINATION");
+
+        if (activeProgram.compareAndSet(null, state)) {
+            moveTo(ExecutionPhase.TERMINATED);
+
+            activeProgram.set(null);
+
+            return true;
+        }
 
-        resultHolder.completeExceptionally(th);
+        return false;
     }
 
     void setError(Throwable err) {
@@ -157,7 +247,6 @@ class Query {
     }
 
     void reset() {
-        resultHolder = new CompletableFuture<>();
         error.set(null);
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
index db6bcfc0f14..d16d4904c30 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
@@ -84,13 +84,11 @@ class QueryExecutionProgram extends 
Program<AsyncSqlCursor<InternalSqlRow>> {
 
                 context.excludeNode(exception.nodeName());
 
-                return true;
-            } else if (lockConflict(th) || replicaMiss(th) || 
groupOverloaded(th)) {
                 return true;
             }
-        }
 
-        query.onError(th);
+            return lockConflict(th) || replicaMiss(th) || groupOverloaded(th);
+        }
 
         return false;
     }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
index c7fba380251..aca00d1c28d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java
@@ -202,10 +202,10 @@ public class QueryExecutor implements LifecycleAware, 
Debuggable {
             query.cancel.setTimeout(scheduler, queryTimeout);
         }
 
-        return Programs.QUERY_EXECUTION.run(query)
+        return query.runProgram(Programs.QUERY_EXECUTION)
                 .whenComplete((cursor, ex) -> {
                     if (cursor != null && query.parsedScript == null) {
-                        cursor.onClose().thenRun(() -> 
query.moveTo(ExecutionPhase.TERMINATED));
+                        cursor.onClose().thenRun(query::terminate);
                     }
                 });
     }
@@ -242,15 +242,17 @@ public class QueryExecutor implements LifecycleAware, 
Debuggable {
         try {
             parent.cancel.attach(query.cancel);
         } catch (QueryCancelledException ex) {
-            query.moveTo(ExecutionPhase.TERMINATED);
+            query.terminate();
 
             return failedFuture(ex);
         }
 
-        return Programs.SCRIPT_ITEM_EXECUTION.run(query)
+        return query.runProgram(Programs.SCRIPT_ITEM_EXECUTION)
                 .whenComplete((cursor, ex) -> {
                     if (cursor != null) {
-                        cursor.onClose().thenRun(() -> 
query.moveTo(ExecutionPhase.TERMINATED));
+                        cursor.onClose().thenRun(query::terminate);
+                    } else if (ex != null) {
+                        query.terminate();
                     }
                 });
     }
@@ -300,7 +302,7 @@ public class QueryExecutor implements LifecycleAware, 
Debuggable {
                 parent.cancel.attach(query.cancel);
             }
         } catch (QueryCancelledException ex) {
-            queries.forEach(query -> query.onError(ex));
+            queries.forEach(query -> query.terminateExceptionally(ex));
 
             return failedFuture(ex);
         } finally {
@@ -309,7 +311,7 @@ public class QueryExecutor implements LifecycleAware, 
Debuggable {
 
         List<CompletableFuture<?>> preparedQueryFutures = new 
ArrayList<>(batch.size());
         for (Query query : queries) {
-            
preparedQueryFutures.add(Programs.SCRIPT_ITEM_PREPARATION.run(query));
+            
preparedQueryFutures.add(query.runProgram(Programs.SCRIPT_ITEM_PREPARATION));
         }
 
         return CompletableFutures.allOf(preparedQueryFutures)
@@ -340,7 +342,7 @@ public class QueryExecutor implements LifecycleAware, 
Debuggable {
                         assert th != null;
 
                         while (it.hasNext()) {
-                            it.next().onError(th);
+                            it.next().terminateExceptionally(th);
                         }
 
                         return failedFuture(th);
@@ -359,7 +361,7 @@ public class QueryExecutor implements LifecycleAware, 
Debuggable {
                                 Throwable th = null;
                                 for (Query query : queries) {
                                     if (th != null) {
-                                        query.onError(th);
+                                        query.terminateExceptionally(th);
 
                                         continue;
                                     }
@@ -395,7 +397,7 @@ public class QueryExecutor implements LifecycleAware, 
Debuggable {
                                         firstCursor = currentCursor;
                                     }
 
-                                    currentCursor.onClose().thenRun(() -> 
query.moveTo(ExecutionPhase.TERMINATED));
+                                    
currentCursor.onClose().thenRun(query::terminate);
                                 }
 
                                 return completedFuture(firstCursor);
@@ -418,10 +420,10 @@ public class QueryExecutor implements LifecycleAware, 
Debuggable {
             CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture = 
lastStep
                     .whenComplete((none, ex) -> {
                         if (ex != null) {
-                            query.onError(ex);
+                            query.terminateExceptionally(ex);
                         }
                     })
-                    .thenCompose(none -> 
Programs.SCRIPT_ITEM_EXECUTION.run(query)
+                    .thenCompose(none -> 
query.runProgram(Programs.SCRIPT_ITEM_EXECUTION)
                             .whenComplete((cursor, ex) -> {
                                 if (cursorRef0 != null) {
                                     if (cursor != null) {
@@ -432,7 +434,9 @@ public class QueryExecutor implements LifecycleAware, 
Debuggable {
                                 }
 
                                 if (cursor != null) {
-                                    cursor.onClose().thenRun(() -> 
query.moveTo(ExecutionPhase.TERMINATED));
+                                    cursor.onClose().thenRun(query::terminate);
+                                } else if (ex != null) {
+                                    query.terminate();
                                 }
                             }));
 
@@ -609,7 +613,7 @@ public class QueryExecutor implements LifecycleAware, 
Debuggable {
 
         Exception ex = new NodeStoppingException();
 
-        runningQueries.values().forEach(query -> query.onError(ex));
+        runningQueries.values().forEach(query -> 
query.terminateExceptionally(ex));
     }
 
     static class ParsedResultWithNextCursorFuture {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SqlExceptionMapperProvider.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SqlExceptionMapperProvider.java
index 1508984a20f..ad1fee12f6c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SqlExceptionMapperProvider.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SqlExceptionMapperProvider.java
@@ -31,6 +31,7 @@ import 
org.apache.ignite.internal.catalog.CatalogValidationException;
 import org.apache.ignite.internal.lang.IgniteExceptionMapper;
 import org.apache.ignite.internal.lang.IgniteExceptionMappersProvider;
 import org.apache.ignite.internal.sql.engine.QueryCancelledException;
+import 
org.apache.ignite.internal.sql.engine.TxControlInsideExternalTxNotSupportedException;
 import 
org.apache.ignite.internal.sql.engine.exec.RemoteFragmentExecutionException;
 import org.apache.ignite.lang.ErrorGroups.Common;
 import org.apache.ignite.lang.IgniteException;
@@ -76,6 +77,9 @@ public class SqlExceptionMapperProvider implements 
IgniteExceptionMappersProvide
         mappers.add(unchecked(InternalCompilerException.class,
                 err -> new SqlException(Common.INTERNAL_ERR, "Expression 
compiler error. " + err.getMessage(), err)));
 
+        
mappers.add(unchecked(TxControlInsideExternalTxNotSupportedException.class, 
+                err -> new SqlException(err.traceId(), err.code(), 
err.getMessage(), err)));
+
         return mappers;
     }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/DdlBatchingTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/DdlBatchingTest.java
index 9c3e2917509..bde3b3964cd 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/DdlBatchingTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/DdlBatchingTest.java
@@ -35,7 +35,6 @@ import java.util.stream.Collectors;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.CatalogService;
-import org.apache.ignite.internal.catalog.CatalogValidationException;
 import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
 import org.apache.ignite.internal.sql.engine.InternalSqlRow;
 import org.apache.ignite.internal.sql.engine.exec.fsm.QueryInfo;
@@ -374,7 +373,7 @@ public class DdlBatchingTest extends BaseIgniteAbstractTest 
{
         assertDdlResult(cursor, true);
         assertThat(cursor.hasNextResult(), is(true));
         assertThat(cursor.nextResult(), willThrowFast(
-                CatalogValidationException.class,
+                SqlException.class,
                 "Table with name 'PUBLIC.T1' already exists"
         ));
 
@@ -475,7 +474,7 @@ public class DdlBatchingTest extends BaseIgniteAbstractTest 
{
         assertDdlResult(cursor, true);
         assertThat(cursor.hasNextResult(), is(true));
         assertThat(cursor.nextResult(), willThrowFast(
-                CatalogValidationException.class,
+                SqlException.class,
                 "Table with name 'PUBLIC.T1' already exists"
         ));
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRecoveryTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRecoveryTest.java
index 3f90c019be0..c9aa1ba3c82 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRecoveryTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryRecoveryTest.java
@@ -43,7 +43,6 @@ import 
org.apache.ignite.internal.sql.engine.framework.NoOpTransactionalOperatio
 import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
 import org.apache.ignite.internal.sql.engine.framework.TestCluster;
 import org.apache.ignite.internal.sql.engine.framework.TestNode;
-import org.apache.ignite.internal.sql.engine.message.UnknownNodeException;
 import org.apache.ignite.internal.sql.engine.prepare.QueryMetadata;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
@@ -57,6 +56,7 @@ import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.LockException;
 import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.sql.SqlException;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
@@ -182,9 +182,9 @@ public class QueryRecoveryTest extends 
BaseIgniteAbstractTest {
         cluster.node(firstExpectedNode).disconnect();
 
         assertThrows(
-                UnknownNodeException.class,
+                SqlException.class,
                 () -> assertQuery(gatewayNode, "SELECT node FROM t1 WHERE 
part_id = 0", txContext).check(),
-                "Unknown node: " + firstExpectedNode
+                "Node left the cluster. Node: " + firstExpectedNode
         );
     }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryTimeoutTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryTimeoutTest.java
index 744ef0323c6..d12cc3cb8f5 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryTimeoutTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/QueryTimeoutTest.java
@@ -117,7 +117,7 @@ public class QueryTimeoutTest extends 
BaseIgniteAbstractTest {
         ignoreCatalogUpdates.set(true);
 
         assertThrows(
-                QueryCancelledException.class,
+                SqlException.class,
                 () -> gatewayNode.executeQuery(PROPS_WITH_TIMEOUT, "CREATE 
TABLE x (id INTEGER PRIMARY KEY, val INTEGER)"),
                 QueryCancelledException.TIMEOUT_MSG
         );
@@ -126,7 +126,7 @@ public class QueryTimeoutTest extends 
BaseIgniteAbstractTest {
     @Test
     void testTimeoutKill() {
         assertThrows(
-                QueryCancelledException.class,
+                SqlException.class,
                 () -> gatewayNode.executeQuery(PROPS_WITH_TIMEOUT, "KILL QUERY 
'" + randomUUID() + '\''),
                 QueryCancelledException.TIMEOUT_MSG
         );


Reply via email to