This is an automated email from the ASF dual-hosted git repository. ppa pushed a commit to branch ignite-25297 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit aa2827255ce9293c4d2d807c70c33dc67443fc27 Author: Pavel Pereslegin <[email protected]> AuthorDate: Tue Jun 3 12:58:07 2025 +0300 IGNITE-25297 Complete script with exception if script transaction was not completed by the script. --- .../cli/commands/sql/ItSqlCommandTest.java | 50 ++++++++++++ .../sql/engine/ItSqlMultiStatementTxTest.java | 42 +++++------ .../sql/engine/exec/fsm/MultiStatementHandler.java | 88 ++++++++++++---------- .../sql/engine/tx/ScriptTransactionContext.java | 8 +- .../engine/tx/ScriptTransactionWrapperImpl.java | 26 +++++-- 5 files changed, 144 insertions(+), 70 deletions(-) diff --git a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/sql/ItSqlCommandTest.java b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/sql/ItSqlCommandTest.java index 988facab0ea..e765798d392 100644 --- a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/sql/ItSqlCommandTest.java +++ b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/sql/ItSqlCommandTest.java @@ -175,4 +175,54 @@ class ItSqlCommandTest extends CliSqlCommandTestBase { () -> assertErrOutputDoesNotContain("Unknown error") ); } + + @Test + @DisplayName("An error should be displayed indicating that the script transaction was not completed by the script.") + void scriptTxNotFinishedByScript() { + String expectedError = "Transaction managed by the script was not completed by the script."; + + { + execute("sql", "START TRANSACTION;", "--jdbc-url", JDBC_URL); + + assertAll( + this::assertOutputIsEmpty, + () -> assertErrOutputContains("SQL query execution error"), + () -> assertErrOutputContains(expectedError), + () -> assertErrOutputDoesNotContain("Unknown error") + ); + } + + { + execute("sql", "START TRANSACTION; SELECT 1;", "--jdbc-url", JDBC_URL); + + assertAll( + this::assertOutputIsEmpty, + () -> assertErrOutputContains("SQL query execution error"), + () -> assertErrOutputContains(expectedError), + () -> assertErrOutputDoesNotContain("Unknown error") + ); + } + + { + execute("sql", "START TRANSACTION; SELECT 1; SELECT 2;", "--jdbc-url", JDBC_URL); + + assertAll( + this::assertOutputIsEmpty, + () -> assertErrOutputContains("SQL query execution error"), + () -> assertErrOutputContains(expectedError), + () -> assertErrOutputDoesNotContain("Unknown error") + ); + } + + { + execute("sql", "START TRANSACTION; SELECT 1; SELECT 2;", "--jdbc-url", JDBC_URL); + + assertAll( + this::assertOutputIsEmpty, + () -> assertErrOutputContains("SQL query execution error"), + () -> assertErrOutputContains(expectedError), + () -> assertErrOutputDoesNotContain("Unknown error") + ); + } + } } diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTxTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTxTest.java index 36dd4a786c0..35b02b05c29 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTxTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTxTest.java @@ -161,26 +161,26 @@ public class ItSqlMultiStatementTxTest extends BaseSqlMultiStatementTest { String startTxStatement = format("START TRANSACTION {};", txOptions); { - runScript(startTxStatement); + assertThrowsSqlException( + RUNTIME_ERR, + "Transaction managed by the script was not completed by the script", + () -> runScript(startTxStatement) + ); verifyFinishedTxCount(1); } { - List<AsyncSqlCursor<InternalSqlRow>> cursors = fetchAllCursors( - runScript(startTxStatement - + "SELECT * FROM TEST;" - + "SELECT * FROM TEST;" - ) + assertThrowsSqlException( + RUNTIME_ERR, + "Transaction managed by the script was not completed by the script", + () -> fetchAllCursors( + runScript(startTxStatement + + "SELECT * FROM TEST;" + + "SELECT * FROM TEST;" + )) ); - assertThat(cursors, hasSize(3)); - - // The transaction depends on the cursors of the SELECT statement, - // so it waits for them to close. - assertEquals(1, txManager().pending()); - - cursors.forEach(AsyncSqlCursor::closeAsync); verifyFinishedTxCount(2); } } @@ -213,13 +213,13 @@ public class ItSqlMultiStatementTxTest extends BaseSqlMultiStatementTest { assertNotNull(cur); // Fetch remaining. - cursors = fetchAllCursors(cur); - assertThat(cursors, hasSize(4)); + AsyncSqlCursor<InternalSqlRow> cur0 = cur; - assertEquals(1, txManager().pending()); - - // Rollback is performed asynchronously. - cursors.forEach(c -> await(c.closeAsync())); + assertThrowsSqlException( + RUNTIME_ERR, + "Transaction managed by the script was not completed by the script", + () -> fetchAllCursors(cur0) + ); // 1 COMMIT + 1 ROLLBACK. verifyFinishedTxCount(2); @@ -296,7 +296,7 @@ public class ItSqlMultiStatementTxTest extends BaseSqlMultiStatementTest { @Test void dmlFailsOnReadOnlyTransaction() { AsyncSqlCursor<InternalSqlRow> cursor = runScript("START TRANSACTION READ ONLY;" - + "SELECT 1;" + + "SELECT x FROM TABLE(SYSTEM_RANGE(1, 1000000));" + "INSERT INTO test VALUES(0);" + "COMMIT;"); @@ -306,7 +306,7 @@ public class ItSqlMultiStatementTxTest extends BaseSqlMultiStatementTest { assertThrowsSqlException(RUNTIME_ERR, "DML cannot be started by using read only transactions.", () -> await(insCur.nextResult())); - expectQueryCancelled(() -> await(insCur.requestNextAsync(1))); + expectQueryCancelled(new DrainCursor(insCur)); verifyFinishedTxCount(1); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/MultiStatementHandler.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/MultiStatementHandler.java index 0ec94bbf48f..f87a724336e 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/MultiStatementHandler.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/MultiStatementHandler.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.sql.engine.sql.ParsedResult; import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext; import org.apache.ignite.internal.sql.engine.tx.ScriptTransactionContext; import org.apache.ignite.internal.sql.engine.util.IteratorToDataCursorAdapter; +import org.apache.ignite.lang.ErrorGroups.Sql; import org.apache.ignite.sql.ResultSetMetadata; import org.apache.ignite.sql.SqlException; import org.jetbrains.annotations.Nullable; @@ -180,6 +181,18 @@ class MultiStatementHandler { return; } + // Check if the script does not have a commit of the active transaction at the end of the script. + if (lastStatement && scriptTxContext.rollbackActiveTx()) { + SqlException ex0 = new SqlException(Sql.RUNTIME_ERR, + "Transaction managed by the script was not completed by the script."); + + cursorFuture.completeExceptionally(ex0); + + cancelAll(ex0); + + return; + } + cursorFuture.complete(cursor); if (!cursor.onClose().isDone()) { @@ -187,58 +200,57 @@ class MultiStatementHandler { } if (lastStatement) { - // Try to rollback script managed transaction, if any. - scriptTxContext.rollbackUncommitted(); - // Main program is completed, therefore it's safe to schedule termination of a query query.resultHolder .thenRun(this::scheduleTermination); + + return; + } + + CompletableFuture<Void> triggerFuture; + ScriptStatement nextStatement = statements.peek(); + + if (implicitTx) { + if (cursor.queryType() != SqlQueryType.QUERY) { + triggerFuture = cursor.onFirstPageReady(); + } else { + triggerFuture = nullCompletedFuture(); + } } else { - CompletableFuture<Void> triggerFuture; - ScriptStatement nextStatement = statements.peek(); + if (cursor.queryType() == SqlQueryType.QUERY) { + inFlightSelects.add(CompletableFuture.anyOf( + cursor.onClose(), cursor.onFirstPageReady() + ).handle((r, e) -> null)); + + if (nextStatement != null && nextStatement.parsedResult.queryType() == SqlQueryType.DML) { + // we need to postpone DML until first page will be ready for every SELECT operation + // prior to that DML + triggerFuture = CompletableFuture.allOf(inFlightSelects.toArray(CompletableFuture[]::new)); - if (implicitTx) { - if (cursor.queryType() != SqlQueryType.QUERY) { - triggerFuture = cursor.onFirstPageReady(); + inFlightSelects.clear(); } else { triggerFuture = nullCompletedFuture(); } } else { - if (cursor.queryType() == SqlQueryType.QUERY) { - inFlightSelects.add(CompletableFuture.anyOf( - cursor.onClose(), cursor.onFirstPageReady() - ).handle((r, e) -> null)); - - if (nextStatement != null && nextStatement.parsedResult.queryType() == SqlQueryType.DML) { - // we need to postpone DML until first page will be ready for every SELECT operation - // prior to that DML - triggerFuture = CompletableFuture.allOf(inFlightSelects.toArray(CompletableFuture[]::new)); - - inFlightSelects.clear(); - } else { - triggerFuture = nullCompletedFuture(); - } - } else { - CompletableFuture<Void> prefetchFuture = cursor.onFirstPageReady(); + CompletableFuture<Void> prefetchFuture = cursor.onFirstPageReady(); - // for non query statements cursor should not be resolved until the very first page is ready. - // if prefetch was completed exceptionally, then cursor future is expected to be completed - // exceptionally as well, resulting in the early return in the very beginning of the `whenComplete` - assert prefetchFuture.isDone() && !prefetchFuture.isCompletedExceptionally() - : "prefetch future is expected to be completed successfully, but was " - + (prefetchFuture.isDone() ? "completed exceptionally" : "not completed"); + // for non query statements cursor should not be resolved until the very first page is ready. + // if prefetch was completed exceptionally, then cursor future is expected to be completed + // exceptionally as well, resulting in the early return in the very beginning of the `whenComplete` + assert prefetchFuture.isDone() && !prefetchFuture.isCompletedExceptionally() + : "prefetch future is expected to be completed successfully, but was " + + (prefetchFuture.isDone() ? "completed exceptionally" : "not completed"); - triggerFuture = nullCompletedFuture(); - } + triggerFuture = nullCompletedFuture(); } + } - triggerFuture.thenRunAsync(this::processNext, query.executor::execute) - .exceptionally(e -> { - cancelAll(e); + triggerFuture.thenRunAsync(this::processNext, query.executor::execute) + .exceptionally(e -> { + cancelAll(e); - return null; - }); - } + return null; + }); }); } catch (TxControlInsideExternalTxNotSupportedException txEx) { scriptTxContext.onError(txEx); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java index a27569df6fd..6179d4b1bb0 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java @@ -137,13 +137,11 @@ public class ScriptTransactionContext implements QueryTransactionContext { } } - /** Attempts to rollback a script-driven transaction if it has not finished. */ - public void rollbackUncommitted() { + /** Returns {@code True} if the script transaction exists and its rollback has been initiated by this call. */ + public boolean rollbackActiveTx() { ScriptTransactionWrapperImpl txWrapper = wrapper; - if (txWrapper != null) { - txWrapper.rollbackWhenCursorsClosed(); - } + return txWrapper != null && txWrapper.rollback(); } /** Rolls back the script-driven transaction. */ diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionWrapperImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionWrapperImpl.java index 5b66b1cb829..3b8b5270081 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionWrapperImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionWrapperImpl.java @@ -129,9 +129,21 @@ class ScriptTransactionWrapperImpl implements QueryTransactionWrapper { }); } - /** Rolls back the transaction when all cursors are closed. */ - void rollbackWhenCursorsClosed() { - changeState(State.ROLLBACK); + /** + * Rolls back the transaction. + * + * @return {@code True} if the rollback was initiated by this call, {@code False} if the transaction completion was already initiated. + */ + boolean rollback() { + if (changeState(State.ROLLBACK)) { + if (!completedTx.get()) { + completeTx(); + } + + return true; + } + + return false; } /** Registers a new cursor associated with the current transaction. */ @@ -166,20 +178,22 @@ class ScriptTransactionWrapperImpl implements QueryTransactionWrapper { }); } - private void changeState(State newState) { + private boolean changeState(State newState) { synchronized (mux) { if (txState != null) { - return; + return false; } txState = newState; if (!openedCursors.isEmpty()) { - return; + return true; } } completeTx(); + + return true; } private void completeTx() {
