This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 5e2f1c24ae IGNITE-21868 Moved the sql RO inflights handling from 
SqlQueryProcessor to QueryTransactionContext and QueryTransactionWrapper (#3511)
5e2f1c24ae is described below

commit 5e2f1c24aec704ea34a2f350bbc364e5f922d4a3
Author: Denis Chudov <[email protected]>
AuthorDate: Fri Jun 7 16:51:56 2024 +0300

    IGNITE-21868 Moved the sql RO inflights handling from SqlQueryProcessor to 
QueryTransactionContext and QueryTransactionWrapper (#3511)
---
 .../internal/sql/engine/SqlQueryProcessor.java     |  12 +-
 .../sql/engine/tx/QueryTransactionContextImpl.java |  30 ++++-
 .../sql/engine/tx/QueryTransactionWrapperImpl.java |  20 +++-
 .../sql/engine/tx/ScriptTransactionContext.java    |   8 +-
 .../engine/tx/ScriptTransactionWrapperImpl.java    |  13 ++-
 .../engine/QueryTransactionWrapperSelfTest.java    | 128 ++++++++++++++++++---
 .../sql/engine/exec/TxAwareCursorSelfTest.java     |   7 +-
 .../sql/engine/framework/ExplicitTxContext.java    |  16 ++-
 .../sql/engine/framework/ImplicitTxContext.java    |  16 ++-
 .../internal/tx/impl/TransactionInflights.java     |   8 ++
 10 files changed, 227 insertions(+), 31 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index c2e69f48fc..3f52ec6837 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -228,6 +228,8 @@ public class SqlQueryProcessor implements QueryProcessor {
 
     private final TxManager txManager;
 
+    private final TransactionInflights transactionInflights;
+
     /** Constructor. */
     public SqlQueryProcessor(
             ClusterService clusterSrvc,
@@ -265,6 +267,7 @@ public class SqlQueryProcessor implements QueryProcessor {
         this.placementDriver = placementDriver;
         this.clusterCfg = clusterCfg;
         this.nodeCfg = nodeCfg;
+        this.transactionInflights = transactionInflights;
         this.txManager = txManager;
 
         sqlSchemaManager = new SqlSchemaManagerImpl(
@@ -499,7 +502,8 @@ public class SqlQueryProcessor implements QueryProcessor {
 
         try {
             SqlProperties properties0 = SqlPropertiesHelper.chain(properties, 
DEFAULT_PROPERTIES);
-            QueryTransactionContext txContext = new 
QueryTransactionContextImpl(txManager, observableTimeTracker, transaction); 
+            QueryTransactionContext txContext = new 
QueryTransactionContextImpl(txManager, observableTimeTracker, transaction,
+                    transactionInflights);
 
             if (Commons.isMultiStatementQueryAllowed(properties0)) {
                 return queryScript(properties0, txContext, qry, params);
@@ -566,7 +570,7 @@ public class SqlQueryProcessor implements QueryProcessor {
             validateParsedStatement(properties, result);
             validateDynamicParameters(result.dynamicParamsCount(), params, 
true);
 
-            HybridTimestamp operationTime = deriveOperationTime(txContext); 
+            HybridTimestamp operationTime = deriveOperationTime(txContext);
 
             SqlOperationContext operationContext = 
SqlOperationContext.builder()
                     .queryId(UUID.randomUUID())
@@ -620,7 +624,7 @@ public class SqlQueryProcessor implements QueryProcessor {
                             .queryId(UUID.randomUUID())
                             // time zone is used in execution phase,
                             // so we may use any time zone for preparation only
-                            .timeZoneId(DEFAULT_TIME_ZONE_ID) 
+                            .timeZoneId(DEFAULT_TIME_ZONE_ID)
                             .defaultSchemaName(schemaName)
                             .operationTime(timestamp)
                             .cancel(queryCancel)
@@ -850,7 +854,7 @@ public class SqlQueryProcessor implements QueryProcessor {
             this.timeZoneId = timeZoneId;
             this.schemaName = schemaName;
             this.statements = prepareStatementsQueue(parsedResults, params);
-            this.scriptTxContext = new ScriptTransactionContext(txContext);
+            this.scriptTxContext = new ScriptTransactionContext(txContext, 
transactionInflights);
         }
 
         /**
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/QueryTransactionContextImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/QueryTransactionContextImpl.java
index b40e4a6951..b1c12873f8 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/QueryTransactionContextImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/QueryTransactionContextImpl.java
@@ -17,10 +17,15 @@
 
 package org.apache.ignite.internal.sql.engine.tx;
 
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
+
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
+import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -30,16 +35,19 @@ public class QueryTransactionContextImpl implements 
QueryTransactionContext {
     private final TxManager txManager;
     private final HybridTimestampTracker observableTimeTracker;
     private final @Nullable QueryTransactionWrapper tx;
+    private final TransactionInflights transactionInflights;
 
     /** Constructor. */
     public QueryTransactionContextImpl(
             TxManager txManager,
             HybridTimestampTracker observableTimeTracker,
-            @Nullable InternalTransaction tx
+            @Nullable InternalTransaction tx,
+            TransactionInflights transactionInflights
     ) {
         this.txManager = txManager;
         this.observableTimeTracker = observableTimeTracker;
-        this.tx = tx != null ? new QueryTransactionWrapperImpl(tx, false) : 
null;
+        this.tx = tx != null ? new QueryTransactionWrapperImpl(tx, false, 
transactionInflights) : null;
+        this.transactionInflights = transactionInflights;
     }
 
     /**
@@ -50,13 +58,23 @@ public class QueryTransactionContextImpl implements 
QueryTransactionContext {
      */
     @Override
     public QueryTransactionWrapper getOrStartImplicit(boolean readOnly) {
+        InternalTransaction transaction;
+        QueryTransactionWrapper result;
+
         if (tx == null) {
-            return new QueryTransactionWrapperImpl(
-                    txManager.begin(observableTimeTracker, readOnly), true
-            );
+            transaction = txManager.begin(observableTimeTracker, readOnly);
+            result = new QueryTransactionWrapperImpl(transaction, true, 
transactionInflights);
+        } else {
+            transaction = tx.unwrap();
+            result = tx;
         }
 
-        return tx;
+        // Adding inflights only for read-only transactions. See 
TransactionInflights.ReadOnlyTxContext for details.
+        if (transaction.isReadOnly() && 
!transactionInflights.addInflight(transaction.id(), transaction.isReadOnly())) {
+            throw new TransactionException(TX_ALREADY_FINISHED_ERR, 
format("Transaction is already finished [tx={}]", transaction));
+        }
+
+        return result;
     }
 
     @Override
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/QueryTransactionWrapperImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/QueryTransactionWrapperImpl.java
index 252ba2bb99..01d626f55e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/QueryTransactionWrapperImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/QueryTransactionWrapperImpl.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.sql.engine.tx;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 
 /**
  * Wrapper for the transaction that encapsulates the management of an implicit 
transaction.
@@ -30,9 +32,21 @@ public class QueryTransactionWrapperImpl implements 
QueryTransactionWrapper {
 
     private final InternalTransaction transaction;
 
-    public QueryTransactionWrapperImpl(InternalTransaction transaction, 
boolean implicit) {
+    private final TransactionInflights transactionInflights;
+
+    private final AtomicBoolean committedImplicit = new AtomicBoolean();
+
+    /**
+     * Constructor.
+     *
+     * @param transaction Transaction.
+     * @param implicit Whether tx is implicit.
+     * @param transactionInflights Transaction inflights.
+     */
+    public QueryTransactionWrapperImpl(InternalTransaction transaction, 
boolean implicit, TransactionInflights transactionInflights) {
         this.transaction = transaction;
         this.implicit = implicit;
+        this.transactionInflights = transactionInflights;
     }
 
     @Override
@@ -42,6 +56,10 @@ public class QueryTransactionWrapperImpl implements 
QueryTransactionWrapper {
 
     @Override
     public CompletableFuture<Void> commitImplicit() {
+        if (transaction.isReadOnly() && committedImplicit.compareAndSet(false, 
true)) {
+            transactionInflights.removeInflight(transaction.id());
+        }
+
         if (implicit) {
             return transaction.commitAsync();
         }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java
index 24e8c3cee3..1fedd2bc1d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java
@@ -31,6 +31,7 @@ import 
org.apache.ignite.internal.sql.engine.sql.IgniteSqlCommitTransaction;
 import org.apache.ignite.internal.sql.engine.sql.IgniteSqlStartTransaction;
 import org.apache.ignite.internal.sql.engine.sql.IgniteSqlStartTransactionMode;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.sql.SqlException;
 import org.jetbrains.annotations.Nullable;
 
@@ -40,13 +41,16 @@ import org.jetbrains.annotations.Nullable;
 public class ScriptTransactionContext implements QueryTransactionContext {
     private final QueryTransactionContextImpl txContext;
 
+    private final TransactionInflights transactionInflights;
+
     private volatile @Nullable ScriptTransactionWrapperImpl wrapper;
 
     /** Constructor. */
-    public ScriptTransactionContext(QueryTransactionContext txContext) {
+    public ScriptTransactionContext(QueryTransactionContext txContext, 
TransactionInflights transactionInflights) {
         assert txContext instanceof QueryTransactionContextImpl : txContext;
 
         this.txContext = (QueryTransactionContextImpl) txContext;
+        this.transactionInflights = transactionInflights;
     }
 
     /**
@@ -104,7 +108,7 @@ public class ScriptTransactionContext implements 
QueryTransactionContext {
             boolean readOnly = ((IgniteSqlStartTransaction) node).getMode() == 
IgniteSqlStartTransactionMode.READ_ONLY;
             InternalTransaction tx = 
txContext.getOrStartImplicit(readOnly).unwrap();
 
-            this.wrapper = new ScriptTransactionWrapperImpl(tx);
+            this.wrapper = new ScriptTransactionWrapperImpl(tx, 
transactionInflights);
 
             return nullCompletedFuture();
         } else {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionWrapperImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionWrapperImpl.java
index 6257570422..2252cd995a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionWrapperImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionWrapperImpl.java
@@ -26,10 +26,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
 import org.apache.ignite.internal.sql.engine.InternalSqlRow;
 import org.apache.ignite.internal.sql.engine.SqlQueryType;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.util.AsyncCursor;
 import org.apache.ignite.sql.SqlException;
 
@@ -62,8 +64,13 @@ class ScriptTransactionWrapperImpl implements 
QueryTransactionWrapper {
 
     private Throwable rollbackCause;
 
-    ScriptTransactionWrapperImpl(InternalTransaction managedTx) {
+    private final TransactionInflights transactionInflights;
+
+    private final AtomicBoolean completedTx = new AtomicBoolean();
+
+    ScriptTransactionWrapperImpl(InternalTransaction managedTx, 
TransactionInflights transactionInflights) {
         this.managedTx = managedTx;
+        this.transactionInflights = transactionInflights;
     }
 
     @Override
@@ -184,6 +191,10 @@ class ScriptTransactionWrapperImpl implements 
QueryTransactionWrapper {
             default:
                 throw new IllegalStateException("Unknown transaction target 
state: " + txState);
         }
+
+        if (managedTx.isReadOnly() && completedTx.compareAndSet(false, true)) {
+            transactionInflights.removeInflight(managedTx.id());
+        }
     }
 
     private void completeTxFuture(Void unused, Throwable e) {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java
index 6f85451754..f12c5615cb 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java
@@ -20,15 +20,23 @@ package org.apache.ignite.internal.sql.engine;
 import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
 import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
+import org.apache.ignite.internal.sql.engine.sql.IgniteSqlCommitTransaction;
 import org.apache.ignite.internal.sql.engine.sql.IgniteSqlStartTransaction;
+import org.apache.ignite.internal.sql.engine.sql.IgniteSqlStartTransactionMode;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContextImpl;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
@@ -37,6 +45,7 @@ import 
org.apache.ignite.internal.sql.engine.tx.ScriptTransactionContext;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.lang.ErrorGroups.Sql;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -50,20 +59,21 @@ import org.mockito.junit.jupiter.MockitoExtension;
 public class QueryTransactionWrapperSelfTest extends BaseIgniteAbstractTest {
     @Mock
     private HybridTimestampTracker observableTimeTracker;
+
     @Mock
     private TxManager txManager;
 
+    @Mock
+    private TransactionInflights transactionInflights;
+
     @Test
     public void testImplicitTransactionAttributes() {
-        when(txManager.begin(any(), anyBoolean())).thenAnswer(
-                inv -> {
-                    boolean readOnly = inv.getArgument(1, Boolean.class);
+        prepareTransactionsMocks();
 
-                    return readOnly ? NoOpTransaction.readOnly("test-ro") : 
NoOpTransaction.readWrite("test-rw");
-                }
-        );
+        when(transactionInflights.addInflight(any(), 
anyBoolean())).thenAnswer(inv -> true);
 
-        QueryTransactionContext transactionHandler = new 
QueryTransactionContextImpl(txManager, observableTimeTracker, null);
+        QueryTransactionContext transactionHandler = new 
QueryTransactionContextImpl(txManager, observableTimeTracker, null,
+                transactionInflights);
         QueryTransactionWrapper transactionWrapper = 
transactionHandler.getOrStartImplicit(false);
 
         assertThat(transactionWrapper.unwrap().isReadOnly(), equalTo(false));
@@ -76,7 +86,7 @@ public class QueryTransactionWrapperSelfTest extends 
BaseIgniteAbstractTest {
     public void commitImplicitTxNotAffectExternalTransaction() {
         NoOpTransaction externalTx = new NoOpTransaction("test");
 
-        QueryTransactionWrapperImpl wrapper = new 
QueryTransactionWrapperImpl(externalTx, false);
+        QueryTransactionWrapperImpl wrapper = new 
QueryTransactionWrapperImpl(externalTx, false, transactionInflights);
         wrapper.commitImplicit();
         assertFalse(externalTx.commitFuture().isDone());
     }
@@ -84,7 +94,7 @@ public class QueryTransactionWrapperSelfTest extends 
BaseIgniteAbstractTest {
     @Test
     public void testCommitImplicit() {
         NoOpTransaction tx = new NoOpTransaction("test");
-        QueryTransactionWrapperImpl wrapper = new 
QueryTransactionWrapperImpl(tx, true);
+        QueryTransactionWrapperImpl wrapper = new 
QueryTransactionWrapperImpl(tx, true, transactionInflights);
 
         wrapper.commitImplicit();
 
@@ -95,7 +105,7 @@ public class QueryTransactionWrapperSelfTest extends 
BaseIgniteAbstractTest {
     @Test
     public void testRollbackImplicit() {
         NoOpTransaction tx = new NoOpTransaction("test");
-        QueryTransactionWrapperImpl wrapper = new 
QueryTransactionWrapperImpl(tx, true);
+        QueryTransactionWrapperImpl wrapper = new 
QueryTransactionWrapperImpl(tx, true, transactionInflights);
 
         wrapper.rollback(null);
 
@@ -106,7 +116,9 @@ public class QueryTransactionWrapperSelfTest extends 
BaseIgniteAbstractTest {
     @Test
     public void 
throwsExceptionForTxControlStatementInsideExternalTransaction() {
         ScriptTransactionContext txCtx = new ScriptTransactionContext(
-                new QueryTransactionContextImpl(txManager, 
observableTimeTracker, new NoOpTransaction("test")));
+                new QueryTransactionContextImpl(txManager, 
observableTimeTracker, new NoOpTransaction("test"), transactionInflights),
+                transactionInflights
+        );
 
         
assertThrowsExactly(TxControlInsideExternalTxNotSupportedException.class, () -> 
txCtx.handleControlStatement(null));
     }
@@ -114,11 +126,12 @@ public class QueryTransactionWrapperSelfTest extends 
BaseIgniteAbstractTest {
     @Test
     public void throwsExceptionForNestedScriptTransaction() {
         ScriptTransactionContext txCtx = new ScriptTransactionContext(
-                new QueryTransactionContextImpl(txManager, 
observableTimeTracker, null)
+                new QueryTransactionContextImpl(txManager, 
observableTimeTracker, null, transactionInflights),
+                transactionInflights
         );
         IgniteSqlStartTransaction txStartStmt = 
mock(IgniteSqlStartTransaction.class);
 
-        when(txManager.begin(any(), anyBoolean())).thenReturn(new 
NoOpTransaction("test"));
+        when(txManager.begin(any(), 
anyBoolean())).thenReturn(NoOpTransaction.readWrite("test"));
 
         txCtx.handleControlStatement(txStartStmt);
 
@@ -129,4 +142,93 @@ public class QueryTransactionWrapperSelfTest extends 
BaseIgniteAbstractTest {
                 () -> txCtx.handleControlStatement(txStartStmt)
         );
     }
+
+    @Test
+    public void testQueryTransactionWrapperTxInflightsInteraction() {
+        Set<UUID> inflights = new HashSet<>();
+
+        prepareTxInflightsMocks(inflights);
+
+        prepareTransactionsMocks();
+
+        QueryTransactionContext implicitDmlTxCtx = new 
QueryTransactionContextImpl(txManager, observableTimeTracker, null,
+                transactionInflights);
+        implicitDmlTxCtx.getOrStartImplicit(false);
+        // Check that RW txns do not create tx inflights.
+        assertTrue(inflights.isEmpty());
+
+        QueryTransactionContext implicitQueryTxCtx = new 
QueryTransactionContextImpl(txManager, observableTimeTracker, null,
+                transactionInflights);
+        QueryTransactionWrapper implicitQueryTxWrapper = 
implicitQueryTxCtx.getOrStartImplicit(true);
+        assertTrue(inflights.contains(implicitQueryTxWrapper.unwrap().id()));
+        implicitQueryTxWrapper.commitImplicit();
+        assertTrue(inflights.isEmpty());
+
+        NoOpTransaction rwTx = NoOpTransaction.readWrite("test-rw");
+        QueryTransactionContext explicitRwTxCtx = new 
QueryTransactionContextImpl(txManager, observableTimeTracker, rwTx,
+                transactionInflights);
+        explicitRwTxCtx.getOrStartImplicit(true);
+        // Check that RW txns do not create tx inflights.
+        assertTrue(inflights.isEmpty());
+
+        NoOpTransaction roTx = NoOpTransaction.readOnly("test-ro");
+        QueryTransactionContext explicitRoTxCtx = new 
QueryTransactionContextImpl(txManager, observableTimeTracker, roTx,
+                transactionInflights);
+        QueryTransactionWrapper explicitRoTxWrapper = 
explicitRoTxCtx.getOrStartImplicit(true);
+        assertTrue(inflights.contains(explicitRoTxWrapper.unwrap().id()));
+        explicitRoTxWrapper.commitImplicit();
+        assertTrue(inflights.isEmpty());
+    }
+
+    @Test
+    public void testScriptTransactionWrapperTxInflightsInteraction() {
+        Set<UUID> inflights = new HashSet<>();
+
+        prepareTxInflightsMocks(inflights);
+
+        prepareTransactionsMocks();
+
+        QueryTransactionContext txCtx = new 
QueryTransactionContextImpl(txManager, observableTimeTracker, null, 
transactionInflights);
+        ScriptTransactionContext scriptRwTxCtx = new 
ScriptTransactionContext(txCtx, transactionInflights);
+
+        IgniteSqlStartTransaction sqlStartRwTx = 
mock(IgniteSqlStartTransaction.class);
+        when(sqlStartRwTx.getMode()).thenAnswer(inv -> 
IgniteSqlStartTransactionMode.READ_WRITE);
+
+        scriptRwTxCtx.handleControlStatement(sqlStartRwTx);
+        assertTrue(inflights.isEmpty());
+
+        ScriptTransactionContext scriptRoTxCtx = new 
ScriptTransactionContext(txCtx, transactionInflights);
+        IgniteSqlStartTransaction sqlStartRoTx = 
mock(IgniteSqlStartTransaction.class);
+        when(sqlStartRoTx.getMode()).thenAnswer(inv -> 
IgniteSqlStartTransactionMode.READ_ONLY);
+
+        scriptRoTxCtx.handleControlStatement(sqlStartRoTx);
+        assertEquals(1, inflights.size());
+
+        QueryTransactionWrapper wrapper = 
scriptRoTxCtx.getOrStartImplicit(true);
+        assertEquals(1, inflights.size());
+
+        // ScriptTransactionWrapperImpl.commitImplicit is noop.
+        wrapper.commitImplicit();
+        assertEquals(1, inflights.size());
+
+        IgniteSqlCommitTransaction sqlCommitTx = 
mock(IgniteSqlCommitTransaction.class);
+        scriptRoTxCtx.handleControlStatement(sqlCommitTx);
+        assertTrue(inflights.isEmpty());
+    }
+
+    private void prepareTransactionsMocks() {
+        when(txManager.begin(any(), anyBoolean())).thenAnswer(
+                inv -> {
+                    boolean readOnly = inv.getArgument(1, Boolean.class);
+
+                    return readOnly ? NoOpTransaction.readOnly("test-ro") : 
NoOpTransaction.readWrite("test-rw");
+                }
+        );
+    }
+
+    private void prepareTxInflightsMocks(Set<UUID> inflights) {
+        when(transactionInflights.addInflight(any(), 
anyBoolean())).thenAnswer(inv -> inflights.add(inv.getArgument(0)));
+
+        doAnswer(inv -> 
inflights.remove(inv.getArgument(0))).when(transactionInflights).removeInflight(any());
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TxAwareCursorSelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TxAwareCursorSelfTest.java
index 904202895e..7889639379 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TxAwareCursorSelfTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TxAwareCursorSelfTest.java
@@ -31,6 +31,8 @@ import java.util.stream.Stream;
 import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapperImpl;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.util.AsyncCursor;
 import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
 import org.apache.ignite.internal.util.AsyncWrapper;
@@ -40,11 +42,12 @@ import org.junit.jupiter.api.Named;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
 
 /**
  * Tests for {@link TxAwareAsyncCursor}.
  */
-public class TxAwareCursorSelfTest {
+public class TxAwareCursorSelfTest extends BaseIgniteAbstractTest {
 
     /** Cursor should trigger commit of implicit transaction (if any) only if 
data is fully read. */
     @ParameterizedTest(name = "{0}")
@@ -117,6 +120,6 @@ public class TxAwareCursorSelfTest {
     }
 
     private static QueryTransactionWrapper newTxWrapper(boolean implicit) {
-        return new QueryTransactionWrapperImpl(NoOpTransaction.readOnly("TX"), 
implicit);
+        return new QueryTransactionWrapperImpl(NoOpTransaction.readOnly("TX"), 
implicit, Mockito.mock(TransactionInflights.class));
     }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ExplicitTxContext.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ExplicitTxContext.java
index fec70d8204..415f27d17a 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ExplicitTxContext.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ExplicitTxContext.java
@@ -17,21 +17,35 @@
 
 package org.apache.ignite.internal.sql.engine.framework;
 
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.ClockServiceImpl;
+import org.apache.ignite.internal.hlc.ClockWaiter;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapperImpl;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.jetbrains.annotations.Nullable;
 
 /** Context that always returns explicit transaction. */
 public class ExplicitTxContext implements QueryTransactionContext {
+    private static final HybridClock CLOCK = new HybridClockImpl();
+
+    private static final TransactionInflights TX_INFLIGHTS = new 
TransactionInflights(
+            new TestPlacementDriver("dummy", UUID.randomUUID().toString()),
+            new ClockServiceImpl(CLOCK, new ClockWaiter("dummy", CLOCK), () -> 
1L)
+    );
+
     private final HybridTimestampTracker observableTimeTracker = new 
HybridTimestampTracker();
     private final QueryTransactionWrapper txWrapper;
 
     public static QueryTransactionContext fromTx(InternalTransaction tx) {
-        return new ExplicitTxContext(new QueryTransactionWrapperImpl(tx, 
false));
+        return new ExplicitTxContext(new QueryTransactionWrapperImpl(tx, 
false, TX_INFLIGHTS));
     }
 
     private ExplicitTxContext(QueryTransactionWrapper txWrapper) {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java
index 9a7b04353d..b22b24fa03 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java
@@ -17,15 +17,29 @@
 
 package org.apache.ignite.internal.sql.engine.framework;
 
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.ClockServiceImpl;
+import org.apache.ignite.internal.hlc.ClockWaiter;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
 import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapperImpl;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.jetbrains.annotations.Nullable;
 
 /** Context that always creates implicit transaction. */
 public class ImplicitTxContext implements QueryTransactionContext {
+    private static final HybridClock CLOCK = new HybridClockImpl();
+
+    private static final TransactionInflights TX_INFLIGHTS = new 
TransactionInflights(
+            new TestPlacementDriver("dummy", UUID.randomUUID().toString()),
+            new ClockServiceImpl(CLOCK, new ClockWaiter("dummy", CLOCK), () -> 
1L)
+    );
+
     public static final QueryTransactionContext INSTANCE = new 
ImplicitTxContext();
 
     private final HybridTimestampTracker observableTimeTracker = new 
HybridTimestampTracker();
@@ -34,7 +48,7 @@ public class ImplicitTxContext implements 
QueryTransactionContext {
 
     @Override
     public QueryTransactionWrapper getOrStartImplicit(boolean readOnly) {
-        return new QueryTransactionWrapperImpl(new NoOpTransaction("dummy"), 
true);
+        return new QueryTransactionWrapperImpl(new NoOpTransaction("dummy"), 
true, TX_INFLIGHTS);
     }
 
     @Override
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
index a16c1bb151..2212c2c642 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
@@ -40,6 +40,7 @@ import 
org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeException;
 import org.apache.ignite.internal.tx.TransactionResult;
+import org.apache.ignite.internal.tx.message.FinishedTransactionsBatchMessage;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 
@@ -192,6 +193,13 @@ public class TransactionInflights {
         abstract boolean isReadyToFinish();
     }
 
+    /**
+     * Transaction inflights for read-only transactions are needed because of 
different finishing protocol which doesn't directly close
+     * transaction resources (cursors, etc.). The finish of read-only 
transaction is a local operation, which is followed by the resources
+     * vacuum that is made in background, see {@link 
FinishedReadOnlyTransactionTracker}. Before sending
+     * {@link FinishedTransactionsBatchMessage}, the trackers needs to be sure 
that all operations (i.e. inflights) of the corresponding
+     * transaction are finished.
+     */
     private static class ReadOnlyTxContext extends TxContext {
         private volatile boolean markedFinished;
 

Reply via email to