This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch ignite-25297
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit aa2827255ce9293c4d2d807c70c33dc67443fc27
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Tue Jun 3 12:58:07 2025 +0300

    IGNITE-25297 Complete script with exception if script transaction was not 
completed by the script.
---
 .../cli/commands/sql/ItSqlCommandTest.java         | 50 ++++++++++++
 .../sql/engine/ItSqlMultiStatementTxTest.java      | 42 +++++------
 .../sql/engine/exec/fsm/MultiStatementHandler.java | 88 ++++++++++++----------
 .../sql/engine/tx/ScriptTransactionContext.java    |  8 +-
 .../engine/tx/ScriptTransactionWrapperImpl.java    | 26 +++++--
 5 files changed, 144 insertions(+), 70 deletions(-)

diff --git 
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/sql/ItSqlCommandTest.java
 
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/sql/ItSqlCommandTest.java
index 988facab0ea..e765798d392 100644
--- 
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/sql/ItSqlCommandTest.java
+++ 
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/sql/ItSqlCommandTest.java
@@ -175,4 +175,54 @@ class ItSqlCommandTest extends CliSqlCommandTestBase {
                 () -> assertErrOutputDoesNotContain("Unknown error")
         );
     }
+
+    @Test
+    @DisplayName("An error should be displayed indicating that the script 
transaction was not completed by the script.")
+    void scriptTxNotFinishedByScript() {
+        String expectedError = "Transaction managed by the script was not 
completed by the script.";
+
+        {
+            execute("sql", "START TRANSACTION;", "--jdbc-url", JDBC_URL);
+
+            assertAll(
+                    this::assertOutputIsEmpty,
+                    () -> assertErrOutputContains("SQL query execution error"),
+                    () -> assertErrOutputContains(expectedError),
+                    () -> assertErrOutputDoesNotContain("Unknown error")
+            );
+        }
+
+        {
+            execute("sql", "START TRANSACTION; SELECT 1;", "--jdbc-url", 
JDBC_URL);
+
+            assertAll(
+                    this::assertOutputIsEmpty,
+                    () -> assertErrOutputContains("SQL query execution error"),
+                    () -> assertErrOutputContains(expectedError),
+                    () -> assertErrOutputDoesNotContain("Unknown error")
+            );
+        }
+
+        {
+            execute("sql", "START TRANSACTION; SELECT 1; SELECT 2;", 
"--jdbc-url", JDBC_URL);
+
+            assertAll(
+                    this::assertOutputIsEmpty,
+                    () -> assertErrOutputContains("SQL query execution error"),
+                    () -> assertErrOutputContains(expectedError),
+                    () -> assertErrOutputDoesNotContain("Unknown error")
+            );
+        }
+
+        {
+            execute("sql", "START TRANSACTION; SELECT 1; SELECT 2;", 
"--jdbc-url", JDBC_URL);
+
+            assertAll(
+                    this::assertOutputIsEmpty,
+                    () -> assertErrOutputContains("SQL query execution error"),
+                    () -> assertErrOutputContains(expectedError),
+                    () -> assertErrOutputDoesNotContain("Unknown error")
+            );
+        }
+    }
 }
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTxTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTxTest.java
index 36dd4a786c0..35b02b05c29 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTxTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTxTest.java
@@ -161,26 +161,26 @@ public class ItSqlMultiStatementTxTest extends 
BaseSqlMultiStatementTest {
         String startTxStatement = format("START TRANSACTION {};", txOptions);
 
         {
-            runScript(startTxStatement);
+            assertThrowsSqlException(
+                    RUNTIME_ERR,
+                    "Transaction managed by the script was not completed by 
the script",
+                    () -> runScript(startTxStatement)
+            );
 
             verifyFinishedTxCount(1);
         }
 
         {
-            List<AsyncSqlCursor<InternalSqlRow>> cursors = fetchAllCursors(
-                    runScript(startTxStatement
-                            + "SELECT * FROM TEST;"
-                            + "SELECT * FROM TEST;"
-                    )
+            assertThrowsSqlException(
+                    RUNTIME_ERR,
+                    "Transaction managed by the script was not completed by 
the script",
+                    () -> fetchAllCursors(
+                            runScript(startTxStatement
+                                    + "SELECT * FROM TEST;"
+                                    + "SELECT * FROM TEST;"
+                            ))
             );
 
-            assertThat(cursors, hasSize(3));
-
-            // The transaction depends on the cursors of the SELECT statement,
-            // so it waits for them to close.
-            assertEquals(1, txManager().pending());
-
-            cursors.forEach(AsyncSqlCursor::closeAsync);
             verifyFinishedTxCount(2);
         }
     }
@@ -213,13 +213,13 @@ public class ItSqlMultiStatementTxTest extends 
BaseSqlMultiStatementTest {
         assertNotNull(cur);
 
         // Fetch remaining.
-        cursors = fetchAllCursors(cur);
-        assertThat(cursors, hasSize(4));
+        AsyncSqlCursor<InternalSqlRow> cur0 = cur;
 
-        assertEquals(1, txManager().pending());
-
-        // Rollback is performed asynchronously.
-        cursors.forEach(c -> await(c.closeAsync()));
+        assertThrowsSqlException(
+                RUNTIME_ERR,
+                "Transaction managed by the script was not completed by the 
script",
+                () -> fetchAllCursors(cur0)
+        );
 
         // 1 COMMIT + 1 ROLLBACK.
         verifyFinishedTxCount(2);
@@ -296,7 +296,7 @@ public class ItSqlMultiStatementTxTest extends 
BaseSqlMultiStatementTest {
     @Test
     void dmlFailsOnReadOnlyTransaction() {
         AsyncSqlCursor<InternalSqlRow> cursor = runScript("START TRANSACTION 
READ ONLY;"
-                + "SELECT 1;"
+                + "SELECT x FROM TABLE(SYSTEM_RANGE(1, 1000000));"
                 + "INSERT INTO test VALUES(0);"
                 + "COMMIT;");
 
@@ -306,7 +306,7 @@ public class ItSqlMultiStatementTxTest extends 
BaseSqlMultiStatementTest {
         assertThrowsSqlException(RUNTIME_ERR, "DML cannot be started by using 
read only transactions.",
                 () -> await(insCur.nextResult()));
 
-        expectQueryCancelled(() -> await(insCur.requestNextAsync(1)));
+        expectQueryCancelled(new DrainCursor(insCur));
 
         verifyFinishedTxCount(1);
     }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/MultiStatementHandler.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/MultiStatementHandler.java
index 0ec94bbf48f..f87a724336e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/MultiStatementHandler.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/MultiStatementHandler.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
 import org.apache.ignite.internal.sql.engine.tx.ScriptTransactionContext;
 import org.apache.ignite.internal.sql.engine.util.IteratorToDataCursorAdapter;
+import org.apache.ignite.lang.ErrorGroups.Sql;
 import org.apache.ignite.sql.ResultSetMetadata;
 import org.apache.ignite.sql.SqlException;
 import org.jetbrains.annotations.Nullable;
@@ -180,6 +181,18 @@ class MultiStatementHandler {
                     return;
                 }
 
+                // Check if the script does not have a commit of the active 
transaction at the end of the script.
+                if (lastStatement && scriptTxContext.rollbackActiveTx()) {
+                    SqlException ex0 = new SqlException(Sql.RUNTIME_ERR,
+                            "Transaction managed by the script was not 
completed by the script.");
+
+                    cursorFuture.completeExceptionally(ex0);
+
+                    cancelAll(ex0);
+
+                    return;
+                }
+
                 cursorFuture.complete(cursor);
 
                 if (!cursor.onClose().isDone()) {
@@ -187,58 +200,57 @@ class MultiStatementHandler {
                 }
 
                 if (lastStatement) {
-                    // Try to rollback script managed transaction, if any.
-                    scriptTxContext.rollbackUncommitted();
-
                     // Main program is completed, therefore it's safe to 
schedule termination of a query
                     query.resultHolder
                             .thenRun(this::scheduleTermination);
+
+                    return;
+                }
+
+                CompletableFuture<Void> triggerFuture;
+                ScriptStatement nextStatement = statements.peek();
+
+                if (implicitTx) {
+                    if (cursor.queryType() != SqlQueryType.QUERY) {
+                        triggerFuture = cursor.onFirstPageReady();
+                    } else {
+                        triggerFuture = nullCompletedFuture();
+                    }
                 } else {
-                    CompletableFuture<Void> triggerFuture;
-                    ScriptStatement nextStatement = statements.peek();
+                    if (cursor.queryType() == SqlQueryType.QUERY) {
+                        inFlightSelects.add(CompletableFuture.anyOf(
+                                cursor.onClose(), cursor.onFirstPageReady()
+                        ).handle((r, e) -> null));
+
+                        if (nextStatement != null && 
nextStatement.parsedResult.queryType() == SqlQueryType.DML) {
+                            // we need to postpone DML until first page will 
be ready for every SELECT operation
+                            // prior to that DML
+                            triggerFuture = 
CompletableFuture.allOf(inFlightSelects.toArray(CompletableFuture[]::new));
 
-                    if (implicitTx) {
-                        if (cursor.queryType() != SqlQueryType.QUERY) {
-                            triggerFuture = cursor.onFirstPageReady();
+                            inFlightSelects.clear();
                         } else {
                             triggerFuture = nullCompletedFuture();
                         }
                     } else {
-                        if (cursor.queryType() == SqlQueryType.QUERY) {
-                            inFlightSelects.add(CompletableFuture.anyOf(
-                                    cursor.onClose(), cursor.onFirstPageReady()
-                            ).handle((r, e) -> null));
-
-                            if (nextStatement != null && 
nextStatement.parsedResult.queryType() == SqlQueryType.DML) {
-                                // we need to postpone DML until first page 
will be ready for every SELECT operation
-                                // prior to that DML
-                                triggerFuture = 
CompletableFuture.allOf(inFlightSelects.toArray(CompletableFuture[]::new));
-
-                                inFlightSelects.clear();
-                            } else {
-                                triggerFuture = nullCompletedFuture();
-                            }
-                        } else {
-                            CompletableFuture<Void> prefetchFuture = 
cursor.onFirstPageReady();
+                        CompletableFuture<Void> prefetchFuture = 
cursor.onFirstPageReady();
 
-                            // for non query statements cursor should not be 
resolved until the very first page is ready.
-                            // if prefetch was completed exceptionally, then 
cursor future is expected to be completed
-                            // exceptionally as well, resulting in the early 
return in the very beginning of the `whenComplete`
-                            assert prefetchFuture.isDone() && 
!prefetchFuture.isCompletedExceptionally()
-                                    : "prefetch future is expected to be 
completed successfully, but was "
-                                    + (prefetchFuture.isDone() ? "completed 
exceptionally" : "not completed");
+                        // for non query statements cursor should not be 
resolved until the very first page is ready.
+                        // if prefetch was completed exceptionally, then 
cursor future is expected to be completed
+                        // exceptionally as well, resulting in the early 
return in the very beginning of the `whenComplete`
+                        assert prefetchFuture.isDone() && 
!prefetchFuture.isCompletedExceptionally()
+                                : "prefetch future is expected to be completed 
successfully, but was "
+                                + (prefetchFuture.isDone() ? "completed 
exceptionally" : "not completed");
 
-                            triggerFuture = nullCompletedFuture();
-                        }
+                        triggerFuture = nullCompletedFuture();
                     }
+                }
 
-                    triggerFuture.thenRunAsync(this::processNext, 
query.executor::execute)
-                            .exceptionally(e -> {
-                                cancelAll(e);
+                triggerFuture.thenRunAsync(this::processNext, 
query.executor::execute)
+                        .exceptionally(e -> {
+                            cancelAll(e);
 
-                                return null;
-                            });
-                }
+                            return null;
+                        });
             });
         } catch (TxControlInsideExternalTxNotSupportedException txEx) {
             scriptTxContext.onError(txEx);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java
index a27569df6fd..6179d4b1bb0 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java
@@ -137,13 +137,11 @@ public class ScriptTransactionContext implements 
QueryTransactionContext {
         }
     }
 
-    /** Attempts to rollback a script-driven transaction if it has not 
finished. */
-    public void rollbackUncommitted() {
+    /** Returns {@code True} if the script transaction exists and its rollback 
has been initiated by this call. */
+    public boolean rollbackActiveTx() {
         ScriptTransactionWrapperImpl txWrapper = wrapper;
 
-        if (txWrapper != null) {
-            txWrapper.rollbackWhenCursorsClosed();
-        }
+        return txWrapper != null && txWrapper.rollback();
     }
 
     /** Rolls back the script-driven transaction. */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionWrapperImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionWrapperImpl.java
index 5b66b1cb829..3b8b5270081 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionWrapperImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionWrapperImpl.java
@@ -129,9 +129,21 @@ class ScriptTransactionWrapperImpl implements 
QueryTransactionWrapper {
         });
     }
 
-    /** Rolls back the transaction when all cursors are closed. */
-    void rollbackWhenCursorsClosed() {
-        changeState(State.ROLLBACK);
+    /**
+     * Rolls back the transaction.
+     *
+     * @return {@code True} if the rollback was initiated by this call, {@code 
False} if the transaction completion was already initiated.
+     */
+    boolean rollback() {
+        if (changeState(State.ROLLBACK)) {
+            if (!completedTx.get()) {
+                completeTx();
+            }
+
+            return true;
+        }
+
+        return false;
     }
 
     /** Registers a new cursor associated with the current transaction. */
@@ -166,20 +178,22 @@ class ScriptTransactionWrapperImpl implements 
QueryTransactionWrapper {
         });
     }
 
-    private void changeState(State newState) {
+    private boolean changeState(State newState) {
         synchronized (mux) {
             if (txState != null) {
-                return;
+                return false;
             }
 
             txState = newState;
 
             if (!openedCursors.isEmpty()) {
-                return;
+                return true;
             }
         }
 
         completeTx();
+
+        return true;
     }
 
     private void completeTx() {

Reply via email to