This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5e2f1c24ae IGNITE-21868 Moved the sql RO inflights handling from
SqlQueryProcessor to QueryTransactionContext and QueryTransactionWrapper (#3511)
5e2f1c24ae is described below
commit 5e2f1c24aec704ea34a2f350bbc364e5f922d4a3
Author: Denis Chudov <[email protected]>
AuthorDate: Fri Jun 7 16:51:56 2024 +0300
IGNITE-21868 Moved the sql RO inflights handling from SqlQueryProcessor to
QueryTransactionContext and QueryTransactionWrapper (#3511)
---
.../internal/sql/engine/SqlQueryProcessor.java | 12 +-
.../sql/engine/tx/QueryTransactionContextImpl.java | 30 ++++-
.../sql/engine/tx/QueryTransactionWrapperImpl.java | 20 +++-
.../sql/engine/tx/ScriptTransactionContext.java | 8 +-
.../engine/tx/ScriptTransactionWrapperImpl.java | 13 ++-
.../engine/QueryTransactionWrapperSelfTest.java | 128 ++++++++++++++++++---
.../sql/engine/exec/TxAwareCursorSelfTest.java | 7 +-
.../sql/engine/framework/ExplicitTxContext.java | 16 ++-
.../sql/engine/framework/ImplicitTxContext.java | 16 ++-
.../internal/tx/impl/TransactionInflights.java | 8 ++
10 files changed, 227 insertions(+), 31 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index c2e69f48fc..3f52ec6837 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -228,6 +228,8 @@ public class SqlQueryProcessor implements QueryProcessor {
private final TxManager txManager;
+ private final TransactionInflights transactionInflights;
+
/** Constructor. */
public SqlQueryProcessor(
ClusterService clusterSrvc,
@@ -265,6 +267,7 @@ public class SqlQueryProcessor implements QueryProcessor {
this.placementDriver = placementDriver;
this.clusterCfg = clusterCfg;
this.nodeCfg = nodeCfg;
+ this.transactionInflights = transactionInflights;
this.txManager = txManager;
sqlSchemaManager = new SqlSchemaManagerImpl(
@@ -499,7 +502,8 @@ public class SqlQueryProcessor implements QueryProcessor {
try {
SqlProperties properties0 = SqlPropertiesHelper.chain(properties,
DEFAULT_PROPERTIES);
- QueryTransactionContext txContext = new
QueryTransactionContextImpl(txManager, observableTimeTracker, transaction);
+ QueryTransactionContext txContext = new
QueryTransactionContextImpl(txManager, observableTimeTracker, transaction,
+ transactionInflights);
if (Commons.isMultiStatementQueryAllowed(properties0)) {
return queryScript(properties0, txContext, qry, params);
@@ -566,7 +570,7 @@ public class SqlQueryProcessor implements QueryProcessor {
validateParsedStatement(properties, result);
validateDynamicParameters(result.dynamicParamsCount(), params,
true);
- HybridTimestamp operationTime = deriveOperationTime(txContext);
+ HybridTimestamp operationTime = deriveOperationTime(txContext);
SqlOperationContext operationContext =
SqlOperationContext.builder()
.queryId(UUID.randomUUID())
@@ -620,7 +624,7 @@ public class SqlQueryProcessor implements QueryProcessor {
.queryId(UUID.randomUUID())
// time zone is used in execution phase,
// so we may use any time zone for preparation only
- .timeZoneId(DEFAULT_TIME_ZONE_ID)
+ .timeZoneId(DEFAULT_TIME_ZONE_ID)
.defaultSchemaName(schemaName)
.operationTime(timestamp)
.cancel(queryCancel)
@@ -850,7 +854,7 @@ public class SqlQueryProcessor implements QueryProcessor {
this.timeZoneId = timeZoneId;
this.schemaName = schemaName;
this.statements = prepareStatementsQueue(parsedResults, params);
- this.scriptTxContext = new ScriptTransactionContext(txContext);
+ this.scriptTxContext = new ScriptTransactionContext(txContext,
transactionInflights);
}
/**
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/QueryTransactionContextImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/QueryTransactionContextImpl.java
index b40e4a6951..b1c12873f8 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/QueryTransactionContextImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/QueryTransactionContextImpl.java
@@ -17,10 +17,15 @@
package org.apache.ignite.internal.sql.engine.tx;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
+
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
+import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
/**
@@ -30,16 +35,19 @@ public class QueryTransactionContextImpl implements
QueryTransactionContext {
private final TxManager txManager;
private final HybridTimestampTracker observableTimeTracker;
private final @Nullable QueryTransactionWrapper tx;
+ private final TransactionInflights transactionInflights;
/** Constructor. */
public QueryTransactionContextImpl(
TxManager txManager,
HybridTimestampTracker observableTimeTracker,
- @Nullable InternalTransaction tx
+ @Nullable InternalTransaction tx,
+ TransactionInflights transactionInflights
) {
this.txManager = txManager;
this.observableTimeTracker = observableTimeTracker;
- this.tx = tx != null ? new QueryTransactionWrapperImpl(tx, false) :
null;
+ this.tx = tx != null ? new QueryTransactionWrapperImpl(tx, false,
transactionInflights) : null;
+ this.transactionInflights = transactionInflights;
}
/**
@@ -50,13 +58,23 @@ public class QueryTransactionContextImpl implements
QueryTransactionContext {
*/
@Override
public QueryTransactionWrapper getOrStartImplicit(boolean readOnly) {
+ InternalTransaction transaction;
+ QueryTransactionWrapper result;
+
if (tx == null) {
- return new QueryTransactionWrapperImpl(
- txManager.begin(observableTimeTracker, readOnly), true
- );
+ transaction = txManager.begin(observableTimeTracker, readOnly);
+ result = new QueryTransactionWrapperImpl(transaction, true,
transactionInflights);
+ } else {
+ transaction = tx.unwrap();
+ result = tx;
}
- return tx;
+ // Adding inflights only for read-only transactions. See
TransactionInflights.ReadOnlyTxContext for details.
+ if (transaction.isReadOnly() &&
!transactionInflights.addInflight(transaction.id(), transaction.isReadOnly())) {
+ throw new TransactionException(TX_ALREADY_FINISHED_ERR,
format("Transaction is already finished [tx={}]", transaction));
+ }
+
+ return result;
}
@Override
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/QueryTransactionWrapperImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/QueryTransactionWrapperImpl.java
index 252ba2bb99..01d626f55e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/QueryTransactionWrapperImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/QueryTransactionWrapperImpl.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.sql.engine.tx;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
/**
* Wrapper for the transaction that encapsulates the management of an implicit
transaction.
@@ -30,9 +32,21 @@ public class QueryTransactionWrapperImpl implements
QueryTransactionWrapper {
private final InternalTransaction transaction;
- public QueryTransactionWrapperImpl(InternalTransaction transaction,
boolean implicit) {
+ private final TransactionInflights transactionInflights;
+
+ private final AtomicBoolean committedImplicit = new AtomicBoolean();
+
+ /**
+ * Constructor.
+ *
+ * @param transaction Transaction.
+ * @param implicit Whether tx is implicit.
+ * @param transactionInflights Transaction inflights.
+ */
+ public QueryTransactionWrapperImpl(InternalTransaction transaction,
boolean implicit, TransactionInflights transactionInflights) {
this.transaction = transaction;
this.implicit = implicit;
+ this.transactionInflights = transactionInflights;
}
@Override
@@ -42,6 +56,10 @@ public class QueryTransactionWrapperImpl implements
QueryTransactionWrapper {
@Override
public CompletableFuture<Void> commitImplicit() {
+ if (transaction.isReadOnly() && committedImplicit.compareAndSet(false,
true)) {
+ transactionInflights.removeInflight(transaction.id());
+ }
+
if (implicit) {
return transaction.commitAsync();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java
index 24e8c3cee3..1fedd2bc1d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionContext.java
@@ -31,6 +31,7 @@ import
org.apache.ignite.internal.sql.engine.sql.IgniteSqlCommitTransaction;
import org.apache.ignite.internal.sql.engine.sql.IgniteSqlStartTransaction;
import org.apache.ignite.internal.sql.engine.sql.IgniteSqlStartTransactionMode;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.sql.SqlException;
import org.jetbrains.annotations.Nullable;
@@ -40,13 +41,16 @@ import org.jetbrains.annotations.Nullable;
public class ScriptTransactionContext implements QueryTransactionContext {
private final QueryTransactionContextImpl txContext;
+ private final TransactionInflights transactionInflights;
+
private volatile @Nullable ScriptTransactionWrapperImpl wrapper;
/** Constructor. */
- public ScriptTransactionContext(QueryTransactionContext txContext) {
+ public ScriptTransactionContext(QueryTransactionContext txContext,
TransactionInflights transactionInflights) {
assert txContext instanceof QueryTransactionContextImpl : txContext;
this.txContext = (QueryTransactionContextImpl) txContext;
+ this.transactionInflights = transactionInflights;
}
/**
@@ -104,7 +108,7 @@ public class ScriptTransactionContext implements
QueryTransactionContext {
boolean readOnly = ((IgniteSqlStartTransaction) node).getMode() ==
IgniteSqlStartTransactionMode.READ_ONLY;
InternalTransaction tx =
txContext.getOrStartImplicit(readOnly).unwrap();
- this.wrapper = new ScriptTransactionWrapperImpl(tx);
+ this.wrapper = new ScriptTransactionWrapperImpl(tx,
transactionInflights);
return nullCompletedFuture();
} else {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionWrapperImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionWrapperImpl.java
index 6257570422..2252cd995a 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionWrapperImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/tx/ScriptTransactionWrapperImpl.java
@@ -26,10 +26,12 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.sql.SqlException;
@@ -62,8 +64,13 @@ class ScriptTransactionWrapperImpl implements
QueryTransactionWrapper {
private Throwable rollbackCause;
- ScriptTransactionWrapperImpl(InternalTransaction managedTx) {
+ private final TransactionInflights transactionInflights;
+
+ private final AtomicBoolean completedTx = new AtomicBoolean();
+
+ ScriptTransactionWrapperImpl(InternalTransaction managedTx,
TransactionInflights transactionInflights) {
this.managedTx = managedTx;
+ this.transactionInflights = transactionInflights;
}
@Override
@@ -184,6 +191,10 @@ class ScriptTransactionWrapperImpl implements
QueryTransactionWrapper {
default:
throw new IllegalStateException("Unknown transaction target
state: " + txState);
}
+
+ if (managedTx.isReadOnly() && completedTx.compareAndSet(false, true)) {
+ transactionInflights.removeInflight(managedTx.id());
+ }
}
private void completeTxFuture(Void unused, Throwable e) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java
index 6f85451754..f12c5615cb 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java
@@ -20,15 +20,23 @@ package org.apache.ignite.internal.sql.engine;
import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
+import org.apache.ignite.internal.sql.engine.sql.IgniteSqlCommitTransaction;
import org.apache.ignite.internal.sql.engine.sql.IgniteSqlStartTransaction;
+import org.apache.ignite.internal.sql.engine.sql.IgniteSqlStartTransactionMode;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContextImpl;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
@@ -37,6 +45,7 @@ import
org.apache.ignite.internal.sql.engine.tx.ScriptTransactionContext;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.lang.ErrorGroups.Sql;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -50,20 +59,21 @@ import org.mockito.junit.jupiter.MockitoExtension;
public class QueryTransactionWrapperSelfTest extends BaseIgniteAbstractTest {
@Mock
private HybridTimestampTracker observableTimeTracker;
+
@Mock
private TxManager txManager;
+ @Mock
+ private TransactionInflights transactionInflights;
+
@Test
public void testImplicitTransactionAttributes() {
- when(txManager.begin(any(), anyBoolean())).thenAnswer(
- inv -> {
- boolean readOnly = inv.getArgument(1, Boolean.class);
+ prepareTransactionsMocks();
- return readOnly ? NoOpTransaction.readOnly("test-ro") :
NoOpTransaction.readWrite("test-rw");
- }
- );
+ when(transactionInflights.addInflight(any(),
anyBoolean())).thenAnswer(inv -> true);
- QueryTransactionContext transactionHandler = new
QueryTransactionContextImpl(txManager, observableTimeTracker, null);
+ QueryTransactionContext transactionHandler = new
QueryTransactionContextImpl(txManager, observableTimeTracker, null,
+ transactionInflights);
QueryTransactionWrapper transactionWrapper =
transactionHandler.getOrStartImplicit(false);
assertThat(transactionWrapper.unwrap().isReadOnly(), equalTo(false));
@@ -76,7 +86,7 @@ public class QueryTransactionWrapperSelfTest extends
BaseIgniteAbstractTest {
public void commitImplicitTxNotAffectExternalTransaction() {
NoOpTransaction externalTx = new NoOpTransaction("test");
- QueryTransactionWrapperImpl wrapper = new
QueryTransactionWrapperImpl(externalTx, false);
+ QueryTransactionWrapperImpl wrapper = new
QueryTransactionWrapperImpl(externalTx, false, transactionInflights);
wrapper.commitImplicit();
assertFalse(externalTx.commitFuture().isDone());
}
@@ -84,7 +94,7 @@ public class QueryTransactionWrapperSelfTest extends
BaseIgniteAbstractTest {
@Test
public void testCommitImplicit() {
NoOpTransaction tx = new NoOpTransaction("test");
- QueryTransactionWrapperImpl wrapper = new
QueryTransactionWrapperImpl(tx, true);
+ QueryTransactionWrapperImpl wrapper = new
QueryTransactionWrapperImpl(tx, true, transactionInflights);
wrapper.commitImplicit();
@@ -95,7 +105,7 @@ public class QueryTransactionWrapperSelfTest extends
BaseIgniteAbstractTest {
@Test
public void testRollbackImplicit() {
NoOpTransaction tx = new NoOpTransaction("test");
- QueryTransactionWrapperImpl wrapper = new
QueryTransactionWrapperImpl(tx, true);
+ QueryTransactionWrapperImpl wrapper = new
QueryTransactionWrapperImpl(tx, true, transactionInflights);
wrapper.rollback(null);
@@ -106,7 +116,9 @@ public class QueryTransactionWrapperSelfTest extends
BaseIgniteAbstractTest {
@Test
public void
throwsExceptionForTxControlStatementInsideExternalTransaction() {
ScriptTransactionContext txCtx = new ScriptTransactionContext(
- new QueryTransactionContextImpl(txManager,
observableTimeTracker, new NoOpTransaction("test")));
+ new QueryTransactionContextImpl(txManager,
observableTimeTracker, new NoOpTransaction("test"), transactionInflights),
+ transactionInflights
+ );
assertThrowsExactly(TxControlInsideExternalTxNotSupportedException.class, () ->
txCtx.handleControlStatement(null));
}
@@ -114,11 +126,12 @@ public class QueryTransactionWrapperSelfTest extends
BaseIgniteAbstractTest {
@Test
public void throwsExceptionForNestedScriptTransaction() {
ScriptTransactionContext txCtx = new ScriptTransactionContext(
- new QueryTransactionContextImpl(txManager,
observableTimeTracker, null)
+ new QueryTransactionContextImpl(txManager,
observableTimeTracker, null, transactionInflights),
+ transactionInflights
);
IgniteSqlStartTransaction txStartStmt =
mock(IgniteSqlStartTransaction.class);
- when(txManager.begin(any(), anyBoolean())).thenReturn(new
NoOpTransaction("test"));
+ when(txManager.begin(any(),
anyBoolean())).thenReturn(NoOpTransaction.readWrite("test"));
txCtx.handleControlStatement(txStartStmt);
@@ -129,4 +142,93 @@ public class QueryTransactionWrapperSelfTest extends
BaseIgniteAbstractTest {
() -> txCtx.handleControlStatement(txStartStmt)
);
}
+
+ @Test
+ public void testQueryTransactionWrapperTxInflightsInteraction() {
+ Set<UUID> inflights = new HashSet<>();
+
+ prepareTxInflightsMocks(inflights);
+
+ prepareTransactionsMocks();
+
+ QueryTransactionContext implicitDmlTxCtx = new
QueryTransactionContextImpl(txManager, observableTimeTracker, null,
+ transactionInflights);
+ implicitDmlTxCtx.getOrStartImplicit(false);
+ // Check that RW txns do not create tx inflights.
+ assertTrue(inflights.isEmpty());
+
+ QueryTransactionContext implicitQueryTxCtx = new
QueryTransactionContextImpl(txManager, observableTimeTracker, null,
+ transactionInflights);
+ QueryTransactionWrapper implicitQueryTxWrapper =
implicitQueryTxCtx.getOrStartImplicit(true);
+ assertTrue(inflights.contains(implicitQueryTxWrapper.unwrap().id()));
+ implicitQueryTxWrapper.commitImplicit();
+ assertTrue(inflights.isEmpty());
+
+ NoOpTransaction rwTx = NoOpTransaction.readWrite("test-rw");
+ QueryTransactionContext explicitRwTxCtx = new
QueryTransactionContextImpl(txManager, observableTimeTracker, rwTx,
+ transactionInflights);
+ explicitRwTxCtx.getOrStartImplicit(true);
+ // Check that RW txns do not create tx inflights.
+ assertTrue(inflights.isEmpty());
+
+ NoOpTransaction roTx = NoOpTransaction.readOnly("test-ro");
+ QueryTransactionContext explicitRoTxCtx = new
QueryTransactionContextImpl(txManager, observableTimeTracker, roTx,
+ transactionInflights);
+ QueryTransactionWrapper explicitRoTxWrapper =
explicitRoTxCtx.getOrStartImplicit(true);
+ assertTrue(inflights.contains(explicitRoTxWrapper.unwrap().id()));
+ explicitRoTxWrapper.commitImplicit();
+ assertTrue(inflights.isEmpty());
+ }
+
+ @Test
+ public void testScriptTransactionWrapperTxInflightsInteraction() {
+ Set<UUID> inflights = new HashSet<>();
+
+ prepareTxInflightsMocks(inflights);
+
+ prepareTransactionsMocks();
+
+ QueryTransactionContext txCtx = new
QueryTransactionContextImpl(txManager, observableTimeTracker, null,
transactionInflights);
+ ScriptTransactionContext scriptRwTxCtx = new
ScriptTransactionContext(txCtx, transactionInflights);
+
+ IgniteSqlStartTransaction sqlStartRwTx =
mock(IgniteSqlStartTransaction.class);
+ when(sqlStartRwTx.getMode()).thenAnswer(inv ->
IgniteSqlStartTransactionMode.READ_WRITE);
+
+ scriptRwTxCtx.handleControlStatement(sqlStartRwTx);
+ assertTrue(inflights.isEmpty());
+
+ ScriptTransactionContext scriptRoTxCtx = new
ScriptTransactionContext(txCtx, transactionInflights);
+ IgniteSqlStartTransaction sqlStartRoTx =
mock(IgniteSqlStartTransaction.class);
+ when(sqlStartRoTx.getMode()).thenAnswer(inv ->
IgniteSqlStartTransactionMode.READ_ONLY);
+
+ scriptRoTxCtx.handleControlStatement(sqlStartRoTx);
+ assertEquals(1, inflights.size());
+
+ QueryTransactionWrapper wrapper =
scriptRoTxCtx.getOrStartImplicit(true);
+ assertEquals(1, inflights.size());
+
+ // ScriptTransactionWrapperImpl.commitImplicit is noop.
+ wrapper.commitImplicit();
+ assertEquals(1, inflights.size());
+
+ IgniteSqlCommitTransaction sqlCommitTx =
mock(IgniteSqlCommitTransaction.class);
+ scriptRoTxCtx.handleControlStatement(sqlCommitTx);
+ assertTrue(inflights.isEmpty());
+ }
+
+ private void prepareTransactionsMocks() {
+ when(txManager.begin(any(), anyBoolean())).thenAnswer(
+ inv -> {
+ boolean readOnly = inv.getArgument(1, Boolean.class);
+
+ return readOnly ? NoOpTransaction.readOnly("test-ro") :
NoOpTransaction.readWrite("test-rw");
+ }
+ );
+ }
+
+ private void prepareTxInflightsMocks(Set<UUID> inflights) {
+ when(transactionInflights.addInflight(any(),
anyBoolean())).thenAnswer(inv -> inflights.add(inv.getArgument(0)));
+
+ doAnswer(inv ->
inflights.remove(inv.getArgument(0))).when(transactionInflights).removeInflight(any());
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TxAwareCursorSelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TxAwareCursorSelfTest.java
index 904202895e..7889639379 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TxAwareCursorSelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TxAwareCursorSelfTest.java
@@ -31,6 +31,8 @@ import java.util.stream.Stream;
import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapperImpl;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
import org.apache.ignite.internal.util.AsyncWrapper;
@@ -40,11 +42,12 @@ import org.junit.jupiter.api.Named;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
/**
* Tests for {@link TxAwareAsyncCursor}.
*/
-public class TxAwareCursorSelfTest {
+public class TxAwareCursorSelfTest extends BaseIgniteAbstractTest {
/** Cursor should trigger commit of implicit transaction (if any) only if
data is fully read. */
@ParameterizedTest(name = "{0}")
@@ -117,6 +120,6 @@ public class TxAwareCursorSelfTest {
}
private static QueryTransactionWrapper newTxWrapper(boolean implicit) {
- return new QueryTransactionWrapperImpl(NoOpTransaction.readOnly("TX"),
implicit);
+ return new QueryTransactionWrapperImpl(NoOpTransaction.readOnly("TX"),
implicit, Mockito.mock(TransactionInflights.class));
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ExplicitTxContext.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ExplicitTxContext.java
index fec70d8204..415f27d17a 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ExplicitTxContext.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ExplicitTxContext.java
@@ -17,21 +17,35 @@
package org.apache.ignite.internal.sql.engine.framework;
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.ClockServiceImpl;
+import org.apache.ignite.internal.hlc.ClockWaiter;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapperImpl;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.jetbrains.annotations.Nullable;
/** Context that always returns explicit transaction. */
public class ExplicitTxContext implements QueryTransactionContext {
+ private static final HybridClock CLOCK = new HybridClockImpl();
+
+ private static final TransactionInflights TX_INFLIGHTS = new
TransactionInflights(
+ new TestPlacementDriver("dummy", UUID.randomUUID().toString()),
+ new ClockServiceImpl(CLOCK, new ClockWaiter("dummy", CLOCK), () ->
1L)
+ );
+
private final HybridTimestampTracker observableTimeTracker = new
HybridTimestampTracker();
private final QueryTransactionWrapper txWrapper;
public static QueryTransactionContext fromTx(InternalTransaction tx) {
- return new ExplicitTxContext(new QueryTransactionWrapperImpl(tx,
false));
+ return new ExplicitTxContext(new QueryTransactionWrapperImpl(tx,
false, TX_INFLIGHTS));
}
private ExplicitTxContext(QueryTransactionWrapper txWrapper) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java
index 9a7b04353d..b22b24fa03 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ImplicitTxContext.java
@@ -17,15 +17,29 @@
package org.apache.ignite.internal.sql.engine.framework;
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.ClockServiceImpl;
+import org.apache.ignite.internal.hlc.ClockWaiter;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapperImpl;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.jetbrains.annotations.Nullable;
/** Context that always creates implicit transaction. */
public class ImplicitTxContext implements QueryTransactionContext {
+ private static final HybridClock CLOCK = new HybridClockImpl();
+
+ private static final TransactionInflights TX_INFLIGHTS = new
TransactionInflights(
+ new TestPlacementDriver("dummy", UUID.randomUUID().toString()),
+ new ClockServiceImpl(CLOCK, new ClockWaiter("dummy", CLOCK), () ->
1L)
+ );
+
public static final QueryTransactionContext INSTANCE = new
ImplicitTxContext();
private final HybridTimestampTracker observableTimeTracker = new
HybridTimestampTracker();
@@ -34,7 +48,7 @@ public class ImplicitTxContext implements
QueryTransactionContext {
@Override
public QueryTransactionWrapper getOrStartImplicit(boolean readOnly) {
- return new QueryTransactionWrapperImpl(new NoOpTransaction("dummy"),
true);
+ return new QueryTransactionWrapperImpl(new NoOpTransaction("dummy"),
true, TX_INFLIGHTS);
}
@Override
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
index a16c1bb151..2212c2c642 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
@@ -40,6 +40,7 @@ import
org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeException;
import org.apache.ignite.internal.tx.TransactionResult;
+import org.apache.ignite.internal.tx.message.FinishedTransactionsBatchMessage;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
@@ -192,6 +193,13 @@ public class TransactionInflights {
abstract boolean isReadyToFinish();
}
+ /**
+ * Transaction inflights for read-only transactions are needed because of
different finishing protocol which doesn't directly close
+ * transaction resources (cursors, etc.). The finish of read-only
transaction is a local operation, which is followed by the resources
+ * vacuum that is made in background, see {@link
FinishedReadOnlyTransactionTracker}. Before sending
+ * {@link FinishedTransactionsBatchMessage}, the trackers needs to be sure
that all operations (i.e. inflights) of the corresponding
+ * transaction are finished.
+ */
private static class ReadOnlyTxContext extends TxContext {
private volatile boolean markedFinished;