This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c80510d0c2 IGNITE-21618 In-flights for read-only transactions (#3371)
c80510d0c2 is described below
commit c80510d0c225593de02f4d6592ecfcbb9837a535
Author: Denis Chudov <[email protected]>
AuthorDate: Tue Mar 19 12:31:42 2024 +0300
IGNITE-21618 In-flights for read-only transactions (#3371)
---
check-rules/spotbugs-excludes.xml | 14 +-
.../apache/ignite/client/fakes/FakeTxManager.java | 10 -
.../runner/app/ItIgniteNodeRestartTest.java | 15 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 15 +-
.../internal/sql/engine/SqlQueryProcessor.java | 39 ++-
.../exec/rel/TableScanNodeExecutionTest.java | 16 +-
.../apache/ignite/distributed/ItLockTableTest.java | 7 +-
...xDistributedTestSingleNodeNoCleanupMessage.java | 7 +-
.../rebalance/ItRebalanceDistributedTest.java | 12 +-
.../ignite/internal/table/ItColocationTest.java | 17 +-
.../internal/table/distributed/TableManager.java | 11 +-
.../distributed/storage/InternalTableImpl.java | 95 ++++++-
.../distributed/TableManagerRecoveryTest.java | 9 +-
.../table/distributed/TableManagerTest.java | 4 +-
.../distributed/storage/InternalTableImplTest.java | 7 +-
.../apache/ignite/distributed/ItTxTestCluster.java | 34 ++-
.../table/impl/DummyInternalTableImpl.java | 25 +-
.../org/apache/ignite/internal/tx/TxManager.java | 15 -
.../impl/FinishedReadOnlyTransactionTracker.java | 52 ++--
.../internal/tx/impl/ReadOnlyTransactionImpl.java | 9 +-
.../internal/tx/impl/ResourceCleanupManager.java | 17 +-
.../internal/tx/impl/TransactionInflights.java | 316 +++++++++++++++++++++
.../ignite/internal/tx/impl/TxManagerImpl.java | 198 ++-----------
.../apache/ignite/internal/tx/TxManagerTest.java | 9 +-
.../tx/impl/ReadOnlyTransactionImplTest.java | 6 +-
25 files changed, 647 insertions(+), 312 deletions(-)
diff --git a/check-rules/spotbugs-excludes.xml
b/check-rules/spotbugs-excludes.xml
index eb117abc40..c35cd0fc21 100644
--- a/check-rules/spotbugs-excludes.xml
+++ b/check-rules/spotbugs-excludes.xml
@@ -186,16 +186,22 @@
<Class
name="org.apache.ignite.internal.pagememory.persistence.checkpoint.Checkpointer"/>
<Method name="cancel"/>
</Match>
+ <Match>
+ <!-- duplicate reported because constants have same value -->
+ <Bug pattern="DB_DUPLICATE_BRANCHES"/>
+ <Source name="AbstractPageMemoryIndexStorage.java"/>
+ </Match>
<Match>
<!-- suppressed in code as well -->
<Bug pattern="VO_VOLATILE_INCREMENT"/>
- <Class name="org.apache.ignite.internal.tx.impl.TxManagerImpl"/>
+ <Class
name="org.apache.ignite.internal.tx.impl.TransactionInflights$TxContext"/>
<Field name="inflights"/>
</Match>
<Match>
- <!-- duplicate reported because constants have same value -->
- <Bug pattern="DB_DUPLICATE_BRANCHES"/>
- <Source name="AbstractPageMemoryIndexStorage.java"/>
+ <!-- suppressed in code as well -->
+ <Bug pattern="VO_VOLATILE_DECREMENT"/>
+ <Class
name="org.apache.ignite.internal.tx.impl.TransactionInflights$TxContext"/>
+ <Field name="inflights"/>
</Match>
<!-- end of false-positive exclusions -->
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 3fe97ca37f..97acd48bd9 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -215,16 +215,6 @@ public class FakeTxManager implements TxManager {
return null;
}
- @Override
- public boolean addInflight(UUID txId) {
- return false;
- }
-
- @Override
- public void removeInflight(UUID txId) {
- // No-op.
- }
-
@Override
public HybridClock clock() {
return clock;
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index a8afc088eb..6286d7efcb 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -179,6 +179,7 @@ import
org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
@@ -455,11 +456,14 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
var resourcesRegistry = new RemotelyTriggeredResourceRegistry();
+ TransactionInflights transactionInflights = new
TransactionInflights(placementDriverManager.placementDriver());
+
ResourceCleanupManager resourceCleanupManager = new
ResourceCleanupManager(
name,
resourcesRegistry,
clusterSvc.topologyService(),
- clusterSvc.messagingService()
+ clusterSvc.messagingService(),
+ transactionInflights
);
var txManager = new TxManagerImpl(
@@ -476,7 +480,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
new TestLocalRwTxCounter(),
threadPoolsManager.partitionOperationsExecutor(),
resourcesRegistry,
- resourceCleanupManager
+ resourceCleanupManager,
+ transactionInflights
);
ConfigurationRegistry clusterConfigRegistry =
clusterCfgMgr.configurationRegistry();
@@ -574,7 +579,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
resourcesRegistry,
rebalanceScheduler,
lowWatermark,
- ForkJoinPool.commonPool()
+ ForkJoinPool.commonPool(),
+ transactionInflights
);
var indexManager = new IndexManager(
@@ -605,7 +611,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
failureProcessor,
placementDriverManager.placementDriver(),
clusterConfigRegistry.getConfiguration(SqlDistributedConfiguration.KEY),
-
nodeCfgMgr.configurationRegistry().getConfiguration(SqlLocalConfiguration.KEY)
+
nodeCfgMgr.configurationRegistry().getConfiguration(SqlLocalConfiguration.KEY),
+ transactionInflights
);
sqlRef.set(new IgniteSqlImpl(name, qryEngine, new
IgniteTransactionsImpl(txManager, new HybridTimestampTracker())));
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index ff2a4da14e..540ed9b62f 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -201,6 +201,7 @@ import
org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.vault.VaultManager;
@@ -673,11 +674,14 @@ public class IgniteImpl implements Ignite {
resourcesRegistry = new RemotelyTriggeredResourceRegistry();
+ TransactionInflights transactionInflights = new
TransactionInflights(placementDriverMgr.placementDriver());
+
resourceCleanupManager = new ResourceCleanupManager(
name,
resourcesRegistry,
clusterSvc.topologyService(),
- messagingServiceReturningToStorageOperationsPool
+ messagingServiceReturningToStorageOperationsPool,
+ transactionInflights
);
// TODO: IGNITE-19344 - use nodeId that is validated on join (and
probably generated differently).
@@ -695,7 +699,8 @@ public class IgniteImpl implements Ignite {
indexNodeFinishedRwTransactionsChecker,
threadPoolsManager.partitionOperationsExecutor(),
resourcesRegistry,
- resourceCleanupManager
+ resourceCleanupManager,
+ transactionInflights
);
StorageUpdateConfiguration storageUpdateConfiguration =
clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY);
@@ -734,7 +739,8 @@ public class IgniteImpl implements Ignite {
resourcesRegistry,
rebalanceScheduler,
lowWatermark,
- asyncContinuationExecutor
+ asyncContinuationExecutor,
+ transactionInflights
);
indexManager = new IndexManager(
@@ -776,7 +782,8 @@ public class IgniteImpl implements Ignite {
failureProcessor,
placementDriverMgr.placementDriver(),
clusterConfigRegistry.getConfiguration(SqlDistributedConfiguration.KEY),
- nodeConfigRegistry.getConfiguration(SqlLocalConfiguration.KEY)
+ nodeConfigRegistry.getConfiguration(SqlLocalConfiguration.KEY),
+ transactionInflights
);
sql = new IgniteSqlImpl(name, qryEngine, new
IgniteTransactionsImpl(txManager, observableTimestampTracker));
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 5508017a73..2a276418de 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.sql.engine;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException;
@@ -26,11 +27,13 @@ import static
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFI
import static
org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.AWAIT_PRIMARY_REPLICA_TIMEOUT;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR;
+import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
import java.time.ZoneId;
import java.util.ArrayList;
@@ -120,6 +123,7 @@ import
org.apache.ignite.internal.systemview.api.SystemViewManager;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.internal.util.AsyncWrapper;
import org.apache.ignite.internal.util.ExceptionUtils;
@@ -130,6 +134,7 @@ import org.apache.ignite.lang.SchemaNotFoundException;
import org.apache.ignite.sql.ResultSetMetadata;
import org.apache.ignite.sql.SqlException;
import org.apache.ignite.tx.IgniteTransactions;
+import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -224,6 +229,8 @@ public class SqlQueryProcessor implements QueryProcessor {
/** Node SQL configuration. */
private final SqlLocalConfiguration nodeCfg;
+ private final TransactionInflights transactionInflights;
+
/** Constructor. */
public SqlQueryProcessor(
Consumer<LongFunction<CompletableFuture<?>>> registry,
@@ -242,7 +249,8 @@ public class SqlQueryProcessor implements QueryProcessor {
FailureProcessor failureProcessor,
PlacementDriver placementDriver,
SqlDistributedConfiguration clusterCfg,
- SqlLocalConfiguration nodeCfg
+ SqlLocalConfiguration nodeCfg,
+ TransactionInflights transactionInflights
) {
this.clusterSrvc = clusterSrvc;
this.logicalTopologyService = logicalTopologyService;
@@ -260,6 +268,7 @@ public class SqlQueryProcessor implements QueryProcessor {
this.placementDriver = placementDriver;
this.clusterCfg = clusterCfg;
this.nodeCfg = nodeCfg;
+ this.transactionInflights = transactionInflights;
sqlSchemaManager = new SqlSchemaManagerImpl(
catalogManager,
@@ -320,7 +329,7 @@ public class SqlQueryProcessor implements QueryProcessor {
@Override
public CompletableFuture<ExecutionTarget>
forTable(ExecutionTargetFactory factory, IgniteTable table) {
return primaryReplicas(table)
- .thenApply(replicas -> factory.partitioned(replicas));
+ .thenApply(factory::partitioned);
}
@Override
@@ -328,7 +337,7 @@ public class SqlQueryProcessor implements QueryProcessor {
List<String> nodes =
systemViewManager.owningNodes(view.name());
if (nullOrEmpty(nodes)) {
- return CompletableFuture.failedFuture(
+ return failedFuture(
new SqlException(Sql.MAPPING_ERR, format("The view
with name '{}' could not be found on"
+ " any active nodes in the cluster",
view.name()))
);
@@ -568,7 +577,27 @@ public class SqlQueryProcessor implements QueryProcessor {
QueryTransactionWrapper txWrapper =
txCtx.getOrStartImplicit(result.queryType());
- return executeParsedStatement(schemaName, result, txWrapper,
queryCancel, timeZoneId, params, null);
+ InternalTransaction tx = txWrapper.unwrap();
+
+ // Adding inflights only for read-only transactions.
+ if (tx.isReadOnly() && !transactionInflights.addInflight(tx.id(),
tx.isReadOnly())) {
+ return failedFuture(new TransactionException(
+ TX_ALREADY_FINISHED_ERR, format("Transaction is
already finished [tx={}]", tx)
+ ));
+ }
+
+ return executeParsedStatement(schemaName, result, txWrapper,
queryCancel, timeZoneId, params, null)
+ .handle((executionResult, e) -> {
+ if (tx.isReadOnly()) {
+
transactionInflights.removeInflight(txWrapper.unwrap().id());
+ }
+
+ if (e != null) {
+ sneakyThrow(e);
+ }
+
+ return executionResult;
+ });
});
// TODO IGNITE-20078 Improve (or remove) CancellationException
handling.
@@ -672,7 +701,7 @@ public class SqlQueryProcessor implements QueryProcessor {
return schema;
});
} catch (Throwable t) {
- return CompletableFuture.failedFuture(t);
+ return failedFuture(t);
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index b24e441924..6aa22be45f 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -75,6 +75,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
@@ -147,11 +148,16 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
RemotelyTriggeredResourceRegistry resourcesRegistry = new
RemotelyTriggeredResourceRegistry();
+ PlacementDriver placementDriver = new
TestPlacementDriver(leaseholder, leaseholder);
+
+ TransactionInflights transactionInflights = new
TransactionInflights(placementDriver);
+
ResourceCleanupManager resourceCleanupManager = new
ResourceCleanupManager(
leaseholder,
resourcesRegistry,
clusterService.topologyService(),
- clusterService.messagingService()
+ clusterService.messagingService(),
+ transactionInflights
);
TxManagerImpl txManager = new TxManagerImpl(
@@ -161,11 +167,12 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
new HeapLockManager(),
new HybridClockImpl(),
new TransactionIdGenerator(0xdeadbeef),
- new TestPlacementDriver(leaseholder, leaseholder),
+ placementDriver,
() ->
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
new TestLocalRwTxCounter(),
resourcesRegistry,
- resourceCleanupManager
+ resourceCleanupManager,
+ transactionInflights
);
txManager.start();
@@ -245,7 +252,8 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
PART_CNT,
Int2ObjectMaps.singleton(0,
mock(RaftGroupService.class)),
new
SingleClusterNodeResolver(mock(ClusterNode.class))
- )
+ ),
+ mock(TransactionInflights.class)
);
this.dataAmount = dataAmount;
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
index 99296eb7de..f1470e47fd 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
@@ -47,6 +47,7 @@ import
org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
import org.apache.ignite.internal.type.NativeTypes;
@@ -133,7 +134,8 @@ public class ItLockTableTest extends IgniteAbstractTest {
ClusterNode node,
PlacementDriver placementDriver,
RemotelyTriggeredResourceRegistry resourcesRegistry,
- ResourceCleanupManager resourceCleanupManager
+ ResourceCleanupManager resourceCleanupManager,
+ TransactionInflights transactionInflights
) {
return new TxManagerImpl(
txConfiguration,
@@ -150,7 +152,8 @@ public class ItLockTableTest extends IgniteAbstractTest {
() ->
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
new TestLocalRwTxCounter(),
resourcesRegistry,
- resourceCleanupManager
+ resourceCleanupManager,
+ transactionInflights
);
}
};
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index 389259130e..7316ce78ab 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
@@ -116,7 +117,8 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage
extends TxAbstractTes
ClusterNode node,
PlacementDriver placementDriver,
RemotelyTriggeredResourceRegistry resourcesRegistry,
- ResourceCleanupManager resourceCleanupManager
+ ResourceCleanupManager resourceCleanupManager,
+ TransactionInflights transactionInflights
) {
return new TxManagerImpl(
txConfiguration,
@@ -129,7 +131,8 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage
extends TxAbstractTes
() ->
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
new TestLocalRwTxCounter(),
resourcesRegistry,
- resourceCleanupManager
+ resourceCleanupManager,
+ transactionInflights
) {
@Override
public CompletableFuture<Void>
executeWriteIntentSwitchAsync(Runnable runnable) {
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index f2ee075757..c1e7d69085 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -188,6 +188,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
@@ -1090,11 +1091,14 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
var resourcesRegistry = new RemotelyTriggeredResourceRegistry();
+ TransactionInflights transactionInflights = new
TransactionInflights(placementDriver);
+
ResourceCleanupManager resourceCleanupManager = new
ResourceCleanupManager(
name,
resourcesRegistry,
clusterService.topologyService(),
- clusterService.messagingService()
+ clusterService.messagingService(),
+ transactionInflights
);
txManager = new TxManagerImpl(
@@ -1108,7 +1112,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
partitionIdleSafeTimePropagationPeriodMsSupplier,
new TestLocalRwTxCounter(),
resourcesRegistry,
- resourceCleanupManager
+ resourceCleanupManager,
+ transactionInflights
);
cfgStorage = new DistributedConfigurationStorage("test",
metaStorageManager);
@@ -1216,7 +1221,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
resourcesRegistry,
rebalanceScheduler,
lowWatermark,
- ForkJoinPool.commonPool()
+ ForkJoinPool.commonPool(),
+ transactionInflights
) {
@Override
protected TxStateTableStorage createTxStateTableStorage(
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 4d321081e6..d599540c39 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.service.RaftGroupService;
@@ -100,6 +101,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import
org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
@@ -166,11 +168,16 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
RemotelyTriggeredResourceRegistry resourcesRegistry = new
RemotelyTriggeredResourceRegistry();
+ PlacementDriver placementDriver = new TestPlacementDriver(clusterNode);
+
+ TransactionInflights transactionInflights = new
TransactionInflights(placementDriver);
+
ResourceCleanupManager resourceCleanupManager = new
ResourceCleanupManager(
clusterNode.name(),
resourcesRegistry,
clusterService.topologyService(),
- clusterService.messagingService()
+ clusterService.messagingService(),
+ transactionInflights
);
txManager = new TxManagerImpl(
@@ -180,11 +187,12 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
new HeapLockManager(),
new HybridClockImpl(),
new TransactionIdGenerator(0xdeadbeef),
- new TestPlacementDriver(clusterNode),
+ placementDriver,
() -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
new TestLocalRwTxCounter(),
resourcesRegistry,
- resourceCleanupManager
+ resourceCleanupManager,
+ transactionInflights
) {
@Override
public CompletableFuture<Void> finish(
@@ -292,7 +300,8 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
new HybridClockImpl(),
observableTimestampTracker,
new TestPlacementDriver(clusterNode),
- new TableRaftServiceImpl("PUBLIC.TEST", PARTS, partRafts, new
SingleClusterNodeResolver(clusterNode))
+ new TableRaftServiceImpl("PUBLIC.TEST", PARTS, partRafts, new
SingleClusterNodeResolver(clusterNode)),
+ transactionInflights
);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index e8d3d12ae0..216de7595c 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -198,6 +198,7 @@ import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
import
org.apache.ignite.internal.tx.storage.state.ThreadAssertingTxStateTableStorage;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
@@ -398,6 +399,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
private final Executor asyncContinuationExecutor;
+ private final TransactionInflights transactionInflights;
+
/**
* Creates a new table manager.
*
@@ -425,6 +428,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
* @param lowWatermark Low watermark.
* @param asyncContinuationExecutor Executor to which execution will be
resubmitted when leaving asynchronous public API endpoints
* (so as to prevent the user from stealing Ignite threads).
+ * @param transactionInflights Transaction inflights.
*/
public TableManager(
String nodeName,
@@ -458,7 +462,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
RemotelyTriggeredResourceRegistry
remotelyTriggeredResourceRegistry,
ScheduledExecutorService rebalanceScheduler,
LowWatermark lowWatermark,
- Executor asyncContinuationExecutor
+ Executor asyncContinuationExecutor,
+ TransactionInflights transactionInflights
) {
this.topologyService = topologyService;
this.raftMgr = raftMgr;
@@ -484,6 +489,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
this.rebalanceScheduler = rebalanceScheduler;
this.lowWatermark = lowWatermark;
this.asyncContinuationExecutor = asyncContinuationExecutor;
+ this.transactionInflights = transactionInflights;
this.executorInclinedSchemaSyncService = new
ExecutorInclinedSchemaSyncService(schemaSyncService,
partitionOperationsExecutor);
this.executorInclinedPlacementDriver = new
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
@@ -1289,7 +1295,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
clock,
observableTimestampTracker,
executorInclinedPlacementDriver,
- tableRaftService
+ tableRaftService,
+ transactionInflights
);
var table = new TableImpl(
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index aaa32b6284..68446acc84 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -97,6 +97,7 @@ import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.LockException;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.ExceptionUtils;
@@ -120,6 +121,10 @@ public class InternalTableImpl implements InternalTable {
/** Primary replica await timeout. */
public static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 30;
+ /** Default no-op implementation to avoid unnecessary allocations. */
+ private static final ReadWriteInflightBatchRequestTracker
READ_WRITE_INFLIGHT_BATCH_REQUEST_TRACKER =
+ new ReadWriteInflightBatchRequestTracker();
+
/** Partitions. */
private final int partitions;
@@ -135,6 +140,8 @@ public class InternalTableImpl implements InternalTable {
/** Transactional manager. */
protected final TxManager txManager;
+ private final TransactionInflights transactionInflights;
+
/** Storage for table data. */
private final MvTableStorage tableStorage;
@@ -182,6 +189,7 @@ public class InternalTableImpl implements InternalTable {
* @param clock A hybrid logical clock.
* @param placementDriver Placement driver.
* @param tableRaftService Table raft service.
+ * @param transactionInflights Transaction inflights.
*/
public InternalTableImpl(
String tableName,
@@ -195,7 +203,8 @@ public class InternalTableImpl implements InternalTable {
HybridClock clock,
HybridTimestampTracker observableTimestampTracker,
PlacementDriver placementDriver,
- TableRaftServiceImpl tableRaftService
+ TableRaftServiceImpl tableRaftService,
+ TransactionInflights transactionInflights
) {
this.tableName = tableName;
this.tableId = tableId;
@@ -210,6 +219,7 @@ public class InternalTableImpl implements InternalTable {
this.observableTimestampTracker = observableTimestampTracker;
this.placementDriver = placementDriver;
this.tableRaftService = tableRaftService;
+ this.transactionInflights = transactionInflights;
}
/** {@inheritDoc} */
@@ -532,6 +542,8 @@ public class InternalTableImpl implements InternalTable {
@Nullable BiPredicate<R, ReplicaRequest> noWriteChecker,
boolean retryOnLockConflict
) {
+ assert !tx.isReadOnly() : format("Tracking invoke is available only
for read-write transactions [tx={}].", tx);
+
ReplicaRequest request =
mapFunc.apply(primaryReplicaAndConsistencyToken.get2());
boolean write = request instanceof SingleRowReplicaRequest &&
((SingleRowReplicaRequest) request).requestType() != RW_GET
@@ -542,7 +554,7 @@ public class InternalTableImpl implements InternalTable {
if (write && !full) {
// Track only write requests from explicit transactions.
- if (!txManager.addInflight(tx.id())) {
+ if (!transactionInflights.addInflight(tx.id(), false)) {
return failedFuture(
new TransactionException(TX_ALREADY_FINISHED_ERR,
format(
"Transaction is already finished
[tableName={}, partId={}, txState={}].",
@@ -557,13 +569,13 @@ public class InternalTableImpl implements InternalTable {
// Remove inflight if no replication was scheduled, otherwise
inflight will be removed by delayed response.
if (noWriteChecker.test(res, request)) {
- txManager.removeInflight(tx.id());
+ transactionInflights.removeInflight(tx.id());
}
return res;
}).exceptionally(e -> {
if (retryOnLockConflict && e.getCause() instanceof
LockException) {
- txManager.removeInflight(tx.id()); // Will be retried.
+ transactionInflights.removeInflight(tx.id()); // Will be
retried.
}
ExceptionUtils.sneakyThrow(e);
@@ -1469,7 +1481,8 @@ public class InternalTableImpl implements InternalTable {
return replicaSvc.invoke(recipientNode, request);
},
// TODO: IGNITE-17666 Close cursor tx finish.
- (intentionallyClose, fut) -> completeScan(txId,
tablePartitionId, fut, recipientNode, intentionallyClose)
+ (intentionallyClose, fut) -> completeScan(txId,
tablePartitionId, fut, recipientNode, intentionallyClose),
+ new ReadOnlyInflightBatchRequestTracker(transactionInflights,
txId)
);
}
@@ -1530,7 +1543,8 @@ public class InternalTableImpl implements InternalTable {
}
return postEnlist(opFut, intentionallyClose, actualTx,
implicit && !intentionallyClose);
- }
+ },
+ READ_WRITE_INFLIGHT_BATCH_REQUEST_TRACKER
);
}
@@ -1572,7 +1586,9 @@ public class InternalTableImpl implements InternalTable {
return replicaSvc.invoke(recipient.node(), request);
},
// TODO: IGNITE-17666 Close cursor tx finish.
- (intentionallyClose, fut) -> completeScan(txId,
tablePartitionId, fut, recipient.node(), intentionallyClose));
+ (intentionallyClose, fut) -> completeScan(txId,
tablePartitionId, fut, recipient.node(), intentionallyClose),
+ READ_WRITE_INFLIGHT_BATCH_REQUEST_TRACKER
+ );
}
/**
@@ -1801,19 +1817,24 @@ public class InternalTableImpl implements InternalTable
{
/** True when the publisher has a subscriber, false otherwise. */
private final AtomicBoolean subscribed;
+ private final InflightBatchRequestTracker inflightBatchRequestTracker;
+
/**
* The constructor.
*
* @param retrieveBatch Closure that gets a new batch from the remote
replica.
* @param onClose The closure will be applied when {@link
Subscription#cancel} is invoked directly or the cursor is
* finished.
+ * @param inflightBatchRequestTracker {@link
InflightBatchRequestTracker} to track betch requests completion.
*/
PartitionScanPublisher(
BiFunction<Long, Integer,
CompletableFuture<Collection<BinaryRow>>> retrieveBatch,
- BiFunction<Boolean, CompletableFuture<Long>,
CompletableFuture<Void>> onClose
+ BiFunction<Boolean, CompletableFuture<Long>,
CompletableFuture<Void>> onClose,
+ InflightBatchRequestTracker inflightBatchRequestTracker
) {
this.retrieveBatch = retrieveBatch;
this.onClose = onClose;
+ this.inflightBatchRequestTracker = inflightBatchRequestTracker;
this.subscribed = new AtomicBoolean(false);
}
@@ -1926,10 +1947,14 @@ public class InternalTableImpl implements InternalTable
{
return;
}
+ inflightBatchRequestTracker.onRequestBegin();
+
retrieveBatch.apply(scanId, n).thenAccept(binaryRows -> {
assert binaryRows != null;
assert binaryRows.size() <= n : "Rows more then requested
" + binaryRows.size() + " " + n;
+ inflightBatchRequestTracker.onRequestEnd();
+
binaryRows.forEach(subscriber::onNext);
if (binaryRows.size() < n) {
@@ -1942,6 +1967,8 @@ public class InternalTableImpl implements InternalTable {
}
}
}).exceptionally(t -> {
+ inflightBatchRequestTracker.onRequestEnd();
+
cancel(t, false);
return null;
@@ -1950,6 +1977,58 @@ public class InternalTableImpl implements InternalTable {
}
}
+ /**
+ * Created for {@code PartitionScanSubscription} to track inflight batch
requests.
+ */
+ private interface InflightBatchRequestTracker {
+ void onRequestBegin();
+
+ void onRequestEnd();
+ }
+
+ /**
+ * This is, in fact, no-op {@code InflightBatchRequestTracker} because
tracking batch requests for read-write transactions is not
+ * needed.
+ */
+ private static class ReadWriteInflightBatchRequestTracker implements
InflightBatchRequestTracker {
+ @Override
+ public void onRequestBegin() {
+ // No-op.
+ }
+
+ @Override
+ public void onRequestEnd() {
+ // No-op.
+ }
+ }
+
+ private static class ReadOnlyInflightBatchRequestTracker implements
InflightBatchRequestTracker {
+ private final TransactionInflights transactionInflights;
+
+ private final UUID txId;
+
+ ReadOnlyInflightBatchRequestTracker(TransactionInflights
transactionInflights, UUID txId) {
+ this.transactionInflights = transactionInflights;
+ this.txId = txId;
+ }
+
+ @Override
+ public void onRequestBegin() {
+ // Track read only requests which are able to create cursors.
+ if (!transactionInflights.addInflight(txId, true)) {
+ throw new TransactionException(TX_ALREADY_FINISHED_ERR, format(
+ "Transaction is already finished () [txId={},
readOnly=true].",
+ txId
+ ));
+ }
+ }
+
+ @Override
+ public void onRequestEnd() {
+ transactionInflights.removeInflight(txId);
+ }
+ }
+
/** {@inheritDoc} */
@Override
public void close() {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index f6346539d8..3b4c8d4fb6 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -74,6 +74,7 @@ import
org.apache.ignite.internal.metastorage.server.TestRocksDbKeyValueStorage;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
@@ -104,6 +105,7 @@ import
org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.network.ClusterNode;
@@ -275,6 +277,8 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
Consumer<LongFunction<CompletableFuture<?>>> revisionUpdater = c ->
metaStorageManager.registerRevisionUpdateListener(c::apply);
+ PlacementDriver placementDriver = new TestPlacementDriver(node);
+
lowWatermark = new TestLowWatermark();
lowWatermark.update(savedWatermark);
tableManager = new TableManager(
@@ -304,12 +308,13 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
new AlwaysSyncedSchemaSyncService(),
catalogManager,
new HybridTimestampTracker(),
- new TestPlacementDriver(node),
+ placementDriver,
() -> mock(IgniteSql.class),
new RemotelyTriggeredResourceRegistry(),
mock(ScheduledExecutorService.class),
lowWatermark,
- ForkJoinPool.commonPool()
+ ForkJoinPool.commonPool(),
+ new TransactionInflights(placementDriver)
) {
@Override
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 9dee6283ab..00a4ce4480 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -127,6 +127,7 @@ import
org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.util.CursorUtils;
@@ -780,7 +781,8 @@ public class TableManagerTest extends IgniteAbstractTest {
new RemotelyTriggeredResourceRegistry(),
mock(ScheduledExecutorService.class),
lowWatermark,
- ForkJoinPool.commonPool()
+ ForkJoinPool.commonPool(),
+ mock(TransactionInflights.class)
) {
@Override
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index 5bb8b89b96..b8871d1007 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -46,6 +46,7 @@ import
org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.ClusterNode;
@@ -70,7 +71,8 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
mock(HybridClock.class),
new HybridTimestampTracker(),
mock(PlacementDriver.class),
- new TableRaftServiceImpl("test", 1, Int2ObjectMaps.emptyMap(),
new SingleClusterNodeResolver(mock(ClusterNode.class)))
+ new TableRaftServiceImpl("test", 1, Int2ObjectMaps.emptyMap(),
new SingleClusterNodeResolver(mock(ClusterNode.class))),
+ mock(TransactionInflights.class)
);
// Let's check the empty table.
@@ -116,7 +118,8 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
mock(HybridClock.class),
new HybridTimestampTracker(),
mock(PlacementDriver.class),
- new TableRaftServiceImpl("test", 3, Int2ObjectMaps.emptyMap(),
new SingleClusterNodeResolver(mock(ClusterNode.class)))
+ new TableRaftServiceImpl("test", 3, Int2ObjectMaps.emptyMap(),
new SingleClusterNodeResolver(mock(ClusterNode.class))),
+ mock(TransactionInflights.class)
);
List<BinaryRowEx> originalRows = List.of(
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 7b9d20e114..ee9fa70414 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -136,6 +136,7 @@ import
org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
@@ -202,8 +203,12 @@ public class ItTxTestCluster {
protected Map<String, TxManager> txManagers;
+ protected Map<String, TransactionInflights> txInflights;
+
protected Map<String, RemotelyTriggeredResourceRegistry> cursorRegistries;
+ private TransactionInflights clientTransactionInflights;
+
protected TxManager clientTxManager;
protected TransactionStateResolver clientTxStateResolver;
@@ -334,6 +339,7 @@ public class ItTxTestCluster {
replicaManagers = new HashMap<>(nodes);
replicaServices = new HashMap<>(nodes);
txManagers = new HashMap<>(nodes);
+ txInflights = new HashMap<>(nodes);
cursorRegistries = new HashMap<>(nodes);
txStateStorages = new HashMap<>(nodes);
@@ -398,11 +404,16 @@ public class ItTxTestCluster {
var resourcesRegistry = new RemotelyTriggeredResourceRegistry();
+ TransactionInflights transactionInflights = new
TransactionInflights(placementDriver);
+
+ txInflights.put(node.name(), transactionInflights);
+
ResourceCleanupManager resourceCleanupManager = new
ResourceCleanupManager(
node.name(),
resourcesRegistry,
clusterService.topologyService(),
- clusterService.messagingService()
+ clusterService.messagingService(),
+ transactionInflights
);
cursorRegistries.put(node.name(), resourcesRegistry);
@@ -415,7 +426,8 @@ public class ItTxTestCluster {
node,
placementDriver,
resourcesRegistry,
- resourceCleanupManager
+ resourceCleanupManager,
+ transactionInflights
);
txMgr.start();
@@ -436,6 +448,7 @@ public class ItTxTestCluster {
} else {
// Collocated mode.
clientTxManager = txManagers.get(localNodeName);
+ clientTransactionInflights = txInflights.get(localNodeName);
}
igniteTransactions = new IgniteTransactionsImpl(clientTxManager,
timestampTracker);
@@ -451,7 +464,8 @@ public class ItTxTestCluster {
ClusterNode node,
PlacementDriver placementDriver,
RemotelyTriggeredResourceRegistry resourcesRegistry,
- ResourceCleanupManager resourceCleanupManager
+ ResourceCleanupManager resourceCleanupManager,
+ TransactionInflights transactionInflights
) {
return new TxManagerImpl(
node.name(),
@@ -467,7 +481,8 @@ public class ItTxTestCluster {
new TestLocalRwTxCounter(),
partitionOperationsExecutor,
resourcesRegistry,
- resourceCleanupManager
+ resourceCleanupManager,
+ transactionInflights
);
}
@@ -702,7 +717,8 @@ public class ItTxTestCluster {
startClient ? clientClock : clocks.get(localNodeName),
timestampTracker,
placementDriver,
- new TableRaftServiceImpl(tableName, 1, clients,
nodeResolver)
+ new TableRaftServiceImpl(tableName, 1, clients,
nodeResolver),
+ clientTransactionInflights
),
new DummySchemaManagerImpl(schemaDescriptor),
clientTxManager.lockManager(),
@@ -924,11 +940,14 @@ public class ItTxTestCluster {
private void initializeClientTxComponents() {
RemotelyTriggeredResourceRegistry resourceRegistry = new
RemotelyTriggeredResourceRegistry();
+ clientTransactionInflights = new TransactionInflights(placementDriver);
+
ResourceCleanupManager resourceCleanupManager = new
ResourceCleanupManager(
"client",
resourceRegistry,
client.topologyService(),
- client.messagingService()
+ client.messagingService(),
+ clientTransactionInflights
);
clientTxManager = new TxManagerImpl(
@@ -945,7 +964,8 @@ public class ItTxTestCluster {
new TestLocalRwTxCounter(),
partitionOperationsExecutor,
resourceRegistry,
- resourceCleanupManager
+ resourceCleanupManager,
+ clientTransactionInflights
);
clientTxStateResolver = new TransactionStateResolver(
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 9c26b73ece..c9bba46a80 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -97,6 +97,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import
org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
@@ -164,14 +165,9 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
this(
replicaSvc,
new TestMvPartitionStorage(0),
- false,
- null,
schema,
- new HybridTimestampTracker(),
- new TestPlacementDriver(LOCAL_NODE),
- storageUpdateConfiguration,
txConfiguration,
- new RemotelyTriggeredResourceRegistry()
+ storageUpdateConfiguration
);
}
@@ -201,7 +197,8 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
new TestPlacementDriver(LOCAL_NODE),
storageUpdateConfiguration,
txConfiguration,
- new RemotelyTriggeredResourceRegistry()
+ new RemotelyTriggeredResourceRegistry(),
+ new TransactionInflights(new TestPlacementDriver(LOCAL_NODE))
);
}
@@ -228,7 +225,8 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
PlacementDriver placementDriver,
StorageUpdateConfiguration storageUpdateConfiguration,
TransactionConfiguration txConfiguration,
- RemotelyTriggeredResourceRegistry resourcesRegistry
+ RemotelyTriggeredResourceRegistry resourcesRegistry,
+ TransactionInflights transactionInflights
) {
super(
"test",
@@ -247,7 +245,8 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
1,
Int2ObjectMaps.singleton(PART_ID,
mock(RaftGroupService.class)),
new SingleClusterNodeResolver(LOCAL_NODE)
- )
+ ),
+ transactionInflights
);
RaftGroupService svc =
tableRaftService().partitionRaftGroupService(PART_ID);
@@ -460,11 +459,14 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
when(clusterService.messagingService()).thenReturn(new
DummyMessagingService(LOCAL_NODE));
when(clusterService.topologyService()).thenReturn(topologyService);
+ TransactionInflights transactionInflights = new
TransactionInflights(placementDriver);
+
ResourceCleanupManager resourceCleanupManager = new
ResourceCleanupManager(
LOCAL_NODE.name(),
resourcesRegistry,
clusterService.topologyService(),
- clusterService.messagingService()
+ clusterService.messagingService(),
+ transactionInflights
);
var txManager = new TxManagerImpl(
@@ -478,7 +480,8 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
() -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
new TestLocalRwTxCounter(),
resourcesRegistry,
- resourceCleanupManager
+ resourceCleanupManager,
+ transactionInflights
);
txManager.start();
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index cbbd1516cf..ad41534ffe 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -191,21 +191,6 @@ public interface TxManager extends IgniteComponent {
*/
CompletableFuture<Void> updateLowWatermark(HybridTimestamp
newLowWatermark);
- /**
- * Registers the infligh update for a transaction.
- *
- * @param txId The transaction id.
- * @return {@code True} if the inflight was registered. The update must be
failed on false.
- */
- boolean addInflight(UUID txId);
-
- /**
- * Unregisters the inflight for a transaction.
- *
- * @param txId The transction id
- */
- void removeInflight(UUID txId);
-
/** Returns the node's hybrid clock. */
HybridClock clock();
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedReadOnlyTransactionTracker.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedReadOnlyTransactionTracker.java
index b751d7faf3..bac0426475 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedReadOnlyTransactionTracker.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedReadOnlyTransactionTracker.java
@@ -18,13 +18,10 @@
package org.apache.ignite.internal.tx.impl;
import static java.util.concurrent.CompletableFuture.allOf;
-import static java.util.stream.Collectors.toSet;
import java.util.Collection;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.LinkedBlockingQueue;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.tx.message.FinishedTransactionsBatchMessage;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
@@ -35,60 +32,59 @@ import org.apache.ignite.network.TopologyService;
* Keeps track of all finished RO transactions.
*/
public class FinishedReadOnlyTransactionTracker {
-
- private static final int MAX_FINISHED_TRANSACTIONS_IN_BATCH = 10_000;
-
/** Tx messages factory. */
private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
- /** A collection of finished read only transactions ordered by the time
when when they were finished. */
- private final Collection<UUID> finishedTransactions = new
LinkedBlockingQueue<>();
-
/** Topology service. */
private final TopologyService topologyService;
/** Messaging service. */
private final MessagingService messagingService;
+ /** Transaction inflights. */
+ private final TransactionInflights transactionInflights;
+
/**
* Constructor.
*
* @param topologyService Topology service.
* @param messagingService Messaging service.
+ * @param transactionInflights Transaction inflights.
*/
- public FinishedReadOnlyTransactionTracker(TopologyService topologyService,
MessagingService messagingService) {
+ public FinishedReadOnlyTransactionTracker(
+ TopologyService topologyService,
+ MessagingService messagingService,
+ TransactionInflights transactionInflights
+ ) {
this.topologyService = topologyService;
this.messagingService = messagingService;
+ this.transactionInflights = transactionInflights;
}
/**
* Send close cursors batch message to all cluster nodes.
*/
public void broadcastClosedTransactions() {
- if (finishedTransactions.isEmpty()) {
- return;
+ Collection<UUID> txToSend =
transactionInflights.finishedReadOnlyTransactions();
+
+ if (!txToSend.isEmpty()) {
+ FinishedTransactionsBatchMessage message =
FACTORY.finishedTransactionsBatchMessage()
+ .transactions(txToSend)
+ .build();
+
+ CompletableFuture<?>[] messages = topologyService.allMembers()
+ .stream()
+ .map(clusterNode -> sendCursorCleanupCommand(clusterNode,
message))
+ .toArray(CompletableFuture[]::new);
+ allOf(messages).thenRun(() ->
transactionInflights.removeTxContexts(txToSend));
}
-
- Set<UUID> txToSend = finishedTransactions.stream()
- .limit(MAX_FINISHED_TRANSACTIONS_IN_BATCH)
- .collect(toSet());
-
- FinishedTransactionsBatchMessage message =
FACTORY.finishedTransactionsBatchMessage()
- .transactions(txToSend)
- .build();
-
- CompletableFuture<?>[] messages = topologyService.allMembers()
- .stream()
- .map(clusterNode -> sendCursorCleanupCommand(clusterNode,
message))
- .toArray(CompletableFuture[]::new);
- allOf(messages).thenRun(() ->
finishedTransactions.removeAll(txToSend));
}
private CompletableFuture<Void> sendCursorCleanupCommand(ClusterNode node,
FinishedTransactionsBatchMessage message) {
return messagingService.send(node, message);
}
- public void onTransactionFinished(UUID id) {
- finishedTransactions.add(id);
+ void onTransactionFinished(UUID id) {
+ transactionInflights.markReadOnlyTxFinished(id);
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
index b73987e238..34affae0b7 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
@@ -41,9 +41,6 @@ class ReadOnlyTransactionImpl extends
IgniteAbstractTransactionImpl {
/** The tracker is used to track an observable timestamp. */
private final HybridTimestampTracker observableTsTracker;
- /** Cleanup manager for tracking closed transactions. */
- private final ResourceCleanupManager resourceCleanupManager;
-
/**
* The constructor.
*
@@ -58,14 +55,12 @@ class ReadOnlyTransactionImpl extends
IgniteAbstractTransactionImpl {
HybridTimestampTracker observableTsTracker,
UUID id,
String txCoordinatorId,
- HybridTimestamp readTimestamp,
- ResourceCleanupManager resourceCleanupManager
+ HybridTimestamp readTimestamp
) {
super(txManager, id, txCoordinatorId);
this.readTimestamp = readTimestamp;
this.observableTsTracker = observableTsTracker;
- this.resourceCleanupManager = resourceCleanupManager;
}
@Override
@@ -119,8 +114,6 @@ class ReadOnlyTransactionImpl extends
IgniteAbstractTransactionImpl {
return nullCompletedFuture();
}
- resourceCleanupManager.onTransactionFinished(id());
-
observableTsTracker.update(executionTimestamp);
return ((TxManagerImpl)
txManager).completeReadOnlyTransactionFuture(new
TxIdAndTimestamp(readTimestamp, id()));
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceCleanupManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceCleanupManager.java
index 55cce511c6..cb51853794 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceCleanupManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceCleanupManager.java
@@ -74,12 +74,14 @@ public class ResourceCleanupManager implements
IgniteComponent {
* @param resourceRegistry Resources registry.
* @param topologyService Topology service.
* @param messagingService Messaging service.
+ * @param transactionInflights Transaction inflights.
*/
public ResourceCleanupManager(
String nodeName,
RemotelyTriggeredResourceRegistry resourceRegistry,
TopologyService topologyService,
- MessagingService messagingService
+ MessagingService messagingService,
+ TransactionInflights transactionInflights
) {
this.resourceRegistry = resourceRegistry;
this.clusterNodeResolver = topologyService;
@@ -87,7 +89,11 @@ public class ResourceCleanupManager implements
IgniteComponent {
RESOURCE_CLEANUP_EXECUTOR_SIZE,
NamedThreadFactory.create(nodeName,
"resource-cleanup-executor", LOG)
);
- this.finishedReadOnlyTransactionTracker = new
FinishedReadOnlyTransactionTracker(topologyService, messagingService);
+ this.finishedReadOnlyTransactionTracker = new
FinishedReadOnlyTransactionTracker(
+ topologyService,
+ messagingService,
+ transactionInflights
+ );
this.finishedTransactionBatchRequestHandler =
new FinishedTransactionBatchRequestHandler(messagingService,
resourceRegistry, resourceCleanupExecutor);
}
@@ -120,7 +126,12 @@ public class ResourceCleanupManager implements
IgniteComponent {
shutdownAndAwaitTermination(resourceCleanupExecutor, 10,
TimeUnit.SECONDS);
}
- public void onTransactionFinished(UUID id) {
+ /**
+ * Is called on the finish of read only transaction.
+ *
+ * @param id Transaction id.
+ */
+ void onReadOnlyTransactionFinished(UUID id) {
finishedReadOnlyTransactionTracker.onTransactionFinished(id);
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
new file mode 100644
index 0000000000..b303532fe9
--- /dev/null
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_PRIMARY_REPLICA_EXPIRED_ERR;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeException;
+import org.apache.ignite.internal.tx.TransactionResult;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Contains counters for in-flight requests of the transactions. Read-write
transactions can't finish when some requests are in-flight.
+ * Read-only transactions can't be included into {@link
org.apache.ignite.internal.tx.message.FinishedTransactionsBatchMessage} when
+ * some requests are in-flight.
+ */
+public class TransactionInflights {
+ /** Hint for maximum concurrent txns. */
+ private static final int MAX_CONCURRENT_TXNS = 1024;
+
+ /** Txn contexts. */
+ private final ConcurrentHashMap<UUID, TxContext> txCtxMap = new
ConcurrentHashMap<>(MAX_CONCURRENT_TXNS);
+
+ private final PlacementDriver placementDriver;
+
+ public TransactionInflights(PlacementDriver placementDriver) {
+ this.placementDriver = placementDriver;
+ }
+
+ /**
+ * Registers the inflight update for a transaction.
+ *
+ * @param txId The transaction id.
+ * @param readOnly Whether the transaction is read-only.
+ * @return {@code True} if the inflight was registered. The update must be
failed on false.
+ */
+ public boolean addInflight(UUID txId, boolean readOnly) {
+ boolean[] res = {true};
+
+ txCtxMap.compute(txId, (uuid, ctx) -> {
+ if (ctx == null) {
+ ctx = readOnly ? new ReadOnlyTxContext() : new
ReadWriteTxContext(placementDriver);
+ }
+
+ res[0] = ctx.addInflight();
+
+ return ctx;
+ });
+
+ return res[0];
+ }
+
+ /**
+ * Unregisters the inflight for a transaction.
+ *
+ * @param txId The transaction id.
+ */
+ public void removeInflight(UUID txId) {
+ TxContext tuple = txCtxMap.compute(txId, (uuid, ctx) -> {
+ assert ctx != null : format("No tx context found on removing
inflight [txId={}]", txId);
+
+ ctx.removeInflight(txId);
+
+ return ctx;
+ });
+
+ // Avoid completion under lock.
+ tuple.onInflightsRemoved();
+ }
+
+ Collection<UUID> finishedReadOnlyTransactions() {
+ return txCtxMap.entrySet().stream()
+ .filter(e -> e.getValue() instanceof ReadOnlyTxContext &&
e.getValue().isReadyToFinish())
+ .map(Entry::getKey)
+ .collect(toSet());
+ }
+
+ void removeTxContexts(Collection<UUID> txIds) {
+ txCtxMap.keySet().removeAll(txIds);
+ }
+
+ void cancelWaitingInflights(TablePartitionId groupId) {
+ for (Map.Entry<UUID, TxContext> ctxEntry : txCtxMap.entrySet()) {
+ if (ctxEntry.getValue() instanceof ReadWriteTxContext) {
+ ReadWriteTxContext txContext = (ReadWriteTxContext)
ctxEntry.getValue();
+
+ if (txContext.isTxFinishing()) {
+ IgniteBiTuple<ClusterNode, Long> nodeAndToken =
txContext.enlistedGroups.get(groupId);
+
+ if (nodeAndToken != null) {
+ txContext.cancelWaitingInflights(groupId,
nodeAndToken.get2());
+ }
+ }
+ }
+ }
+ }
+
+ void markReadOnlyTxFinished(UUID txId) {
+ txCtxMap.compute(txId, (k, ctx) -> {
+ if (ctx == null) {
+ ctx = new ReadOnlyTxContext();
+ }
+
+ ctx.finishTx(null);
+
+ return ctx;
+ });
+ }
+
+ ReadWriteTxContext lockTxForNewUpdates(UUID txId, Map<TablePartitionId,
IgniteBiTuple<ClusterNode, Long>> enlistedGroups) {
+ return (ReadWriteTxContext) txCtxMap.compute(txId, (uuid, tuple0) -> {
+ if (tuple0 == null) {
+ tuple0 = new ReadWriteTxContext(placementDriver); // No writes
enlisted.
+ }
+
+ assert !tuple0.isTxFinishing() : "Transaction is already finished
[id=" + uuid + "].";
+
+ tuple0.finishTx(enlistedGroups);
+
+ return tuple0;
+ });
+ }
+
+ abstract static class TxContext {
+ volatile long inflights = 0; // Updated under lock.
+
+ boolean addInflight() {
+ if (isTxFinishing()) {
+ return false;
+ } else {
+ // noinspection NonAtomicOperationOnVolatileField
+ inflights++;
+ return true;
+ }
+ }
+
+ void removeInflight(UUID txId) {
+ assert inflights > 0 : format("No inflights, cannot remove any
[txId={}, ctx={}]", txId, this);
+
+ // noinspection NonAtomicOperationOnVolatileField
+ inflights--;
+ }
+
+ abstract void onInflightsRemoved();
+
+ abstract void finishTx(@Nullable Map<TablePartitionId,
IgniteBiTuple<ClusterNode, Long>> enlistedGroups);
+
+ abstract boolean isTxFinishing();
+
+ abstract boolean isReadyToFinish();
+ }
+
+ private static class ReadOnlyTxContext extends TxContext {
+ private volatile boolean markedFinished;
+
+ @Override
+ public void onInflightsRemoved() {
+ // No-op.
+ }
+
+ @Override
+ public void finishTx(@Nullable Map<TablePartitionId,
IgniteBiTuple<ClusterNode, Long>> enlistedGroups) {
+ markedFinished = true;
+ }
+
+ @Override
+ public boolean isTxFinishing() {
+ return markedFinished;
+ }
+
+ @Override
+ public boolean isReadyToFinish() {
+ return markedFinished && inflights == 0;
+ }
+
+ @Override
+ public String toString() {
+ return "ReadOnlyTxContext [inflights=" + inflights + ']';
+ }
+ }
+
+ static class ReadWriteTxContext extends TxContext {
+ private final CompletableFuture<Void> waitRepFut = new
CompletableFuture<>();
+ private final PlacementDriver placementDriver;
+ private volatile CompletableFuture<Void> finishInProgressFuture = null;
+ private volatile Map<TablePartitionId, IgniteBiTuple<ClusterNode,
Long>> enlistedGroups;
+
+ private ReadWriteTxContext(PlacementDriver placementDriver) {
+ this.placementDriver = placementDriver;
+ }
+
+ CompletableFuture<Void> performFinish(boolean commit,
Function<Boolean, CompletableFuture<Void>> finishAction) {
+ waitReadyToFinish(commit)
+ .whenComplete((ignoredReadyToFinish, readyException) ->
finishAction.apply(commit && readyException == null)
+ .whenComplete((ignoredFinishActionResult,
finishException) ->
+ completeFinishInProgressFuture(commit,
readyException, finishException))
+ );
+
+ return finishInProgressFuture;
+ }
+
+ private void completeFinishInProgressFuture(
+ boolean commit,
+ @Nullable Throwable readyToFinishException,
+ @Nullable Throwable finishException
+ ) {
+ if (readyToFinishException == null) {
+ if (finishException == null) {
+ finishInProgressFuture.complete(null);
+ } else {
+
finishInProgressFuture.completeExceptionally(finishException);
+ }
+ } else {
+ if (commit && readyToFinishException instanceof
PrimaryReplicaExpiredException) {
+ finishInProgressFuture.completeExceptionally(new
MismatchingTransactionOutcomeException(
+ TX_PRIMARY_REPLICA_EXPIRED_ERR,
+ "Failed to commit the transaction.",
+ new TransactionResult(ABORTED, null),
+ readyToFinishException
+ ));
+ } else {
+
finishInProgressFuture.completeExceptionally(readyToFinishException);
+ }
+ }
+ }
+
+ private CompletableFuture<Void> waitReadyToFinish(boolean commit) {
+ if (commit) {
+ for (Map.Entry<TablePartitionId, IgniteBiTuple<ClusterNode,
Long>> e : enlistedGroups.entrySet()) {
+ ReplicaMeta replicaMeta =
placementDriver.currentLease(e.getKey());
+
+ Long enlistmentConsistencyToken = e.getValue().get2();
+
+ if (replicaMeta == null ||
!enlistmentConsistencyToken.equals(replicaMeta.getStartTime().longValue())) {
+ return failedFuture(new
PrimaryReplicaExpiredException(e.getKey(), enlistmentConsistencyToken, null,
replicaMeta));
+ }
+ }
+
+ return waitNoInflights();
+ } else {
+ return nullCompletedFuture();
+ }
+ }
+
+ private CompletableFuture<Void> waitNoInflights() {
+ if (inflights == 0) {
+ waitRepFut.complete(null);
+ }
+ return waitRepFut;
+ }
+
+ void cancelWaitingInflights(TablePartitionId groupId, Long
enlistmentConsistencyToken) {
+ waitRepFut.completeExceptionally(new
PrimaryReplicaExpiredException(groupId, enlistmentConsistencyToken, null,
null));
+ }
+
+ @Override
+ public void onInflightsRemoved() {
+ if (inflights == 0 && finishInProgressFuture != null) {
+ waitRepFut.complete(null);
+ }
+ }
+
+ @Override
+ public void finishTx(Map<TablePartitionId, IgniteBiTuple<ClusterNode,
Long>> enlistedGroups) {
+ this.enlistedGroups = enlistedGroups;
+ finishInProgressFuture = new CompletableFuture<>();
+ }
+
+ @Override
+ public boolean isTxFinishing() {
+ return finishInProgressFuture != null;
+ }
+
+ @Override
+ public boolean isReadyToFinish() {
+ return waitRepFut.isDone();
+ }
+
+ @Override
+ public String toString() {
+ return "ReadWriteTxContext [inflights=" + inflights + ",
waitRepFut=" + waitRepFut
+ + ", finishFut=" + finishInProgressFuture + ']';
+ }
+ }
+}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index e93463fc80..2f0c40e4de 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -22,6 +22,7 @@ import static
java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.function.Function.identity;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
import static org.apache.ignite.internal.tx.TransactionIds.beginTimestamp;
@@ -34,7 +35,6 @@ import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
-import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_PRIMARY_REPLICA_EXPIRED_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR;
import java.io.IOException;
@@ -45,7 +45,6 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executor;
@@ -66,7 +65,6 @@ import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.IgniteInternalException;
-import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ClusterService;
@@ -74,7 +72,6 @@ import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessageHandler;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
-import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.ReplicaService;
@@ -100,6 +97,7 @@ import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.TxStateMetaFinishing;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
+import
org.apache.ignite.internal.tx.impl.TransactionInflights.ReadWriteTxContext;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -117,9 +115,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
/** The logger. */
private static final IgniteLogger LOG =
Loggers.forClass(TxManagerImpl.class);
- /** Hint for maximum concurrent txns. */
- private static final int MAX_CONCURRENT_TXNS = 1024;
-
/** Transaction configuration. */
private final TransactionConfiguration txConfig;
@@ -138,9 +133,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
/** The local state storage. */
private final VolatileTxStateMetaStorage txStateVolatileStorage = new
VolatileTxStateMetaStorage();
- /** Txn contexts. */
- private final ConcurrentHashMap<UUID, TxContext> txCtxMap = new
ConcurrentHashMap<>(MAX_CONCURRENT_TXNS);
-
/** Future of a read-only transaction by it {@link TxIdAndTimestamp}. */
private final ConcurrentNavigableMap<TxIdAndTimestamp,
CompletableFuture<Void>> readOnlyTxFutureById = new ConcurrentSkipListMap<>(
Comparator.comparing(TxIdAndTimestamp::getReadTimestamp).thenComparing(TxIdAndTimestamp::getTxId)
@@ -208,6 +200,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
/** Cleanup manager for tx resources. */
private final ResourceCleanupManager resourceCleanupManager;
+ private final TransactionInflights transactionInflights;
+
/**
* Test-only constructor.
*
@@ -221,6 +215,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
* @param idleSafeTimePropagationPeriodMsSupplier Used to get idle safe
time propagation period in ms.
* @param localRwTxCounter Counter of read-write transactions that were
created and completed locally on the node.
* @param resourcesRegistry Resources registry.
+ * @param transactionInflights Transaction inflights.
*/
@TestOnly
public TxManagerImpl(
@@ -234,7 +229,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
LongSupplier idleSafeTimePropagationPeriodMsSupplier,
LocalRwTxCounter localRwTxCounter,
RemotelyTriggeredResourceRegistry resourcesRegistry,
- ResourceCleanupManager resourceCleanupManager
+ ResourceCleanupManager resourceCleanupManager,
+ TransactionInflights transactionInflights
) {
this(
clusterService.nodeName(),
@@ -250,7 +246,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
localRwTxCounter,
ForkJoinPool.commonPool(),
resourcesRegistry,
- resourceCleanupManager
+ resourceCleanupManager,
+ transactionInflights
);
}
@@ -269,6 +266,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
* @param localRwTxCounter Counter of read-write transactions that were
created and completed locally on the node.
* @param partitionOperationsExecutor Executor on which partition
operations will be executed, if needed.
* @param resourcesRegistry Resources registry.
+ * @param transactionInflights Transaction inflights.
*/
public TxManagerImpl(
String nodeName,
@@ -284,7 +282,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
LocalRwTxCounter localRwTxCounter,
Executor partitionOperationsExecutor,
RemotelyTriggeredResourceRegistry resourcesRegistry,
- ResourceCleanupManager resourceCleanupManager
+ ResourceCleanupManager resourceCleanupManager,
+ TransactionInflights transactionInflights
) {
this.txConfig = txConfig;
this.lockManager = lockManager;
@@ -298,6 +297,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
this.localRwTxCounter = localRwTxCounter;
this.partitionOperationsExecutor = partitionOperationsExecutor;
this.resourceCleanupManager = resourceCleanupManager;
+ this.transactionInflights = transactionInflights;
placementDriverHelper = new PlacementDriverHelper(placementDriver,
clock);
@@ -337,17 +337,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
TablePartitionId groupId = (TablePartitionId)
eventParameters.groupId();
- for (Map.Entry<UUID, TxContext> ctxEntry : txCtxMap.entrySet()) {
- TxContext txContext = ctxEntry.getValue();
-
- if (txContext.isTxFinishing()) {
- IgniteBiTuple<ClusterNode, Long> nodeAndToken =
txContext.enlistedGroups.get(groupId);
-
- if (nodeAndToken != null) {
- txContext.cancelWaitingInflights(groupId,
nodeAndToken.get2());
- }
- }
- }
+ transactionInflights.cancelWaitingInflights(groupId);
return falseCompletedFuture();
});
@@ -409,7 +399,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
);
}
- return new ReadOnlyTransactionImpl(this, timestampTracker, txId,
localNodeId, readTimestamp, resourceCleanupManager);
+ return new ReadOnlyTransactionImpl(this, timestampTracker, txId,
localNodeId, readTimestamp);
}
/**
@@ -511,7 +501,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
}
}
- TxContext txContext = lockTxForNewUpdates(txId, enlistedGroups);
+ ReadWriteTxContext txContext =
transactionInflights.lockTxForNewUpdates(txId, enlistedGroups);
// Wait for commit acks first, then proceed with the finish request.
return txContext.performFinish(commitIntent, commit ->
@@ -530,20 +520,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
});
}
- private TxContext lockTxForNewUpdates(UUID txId, Map<TablePartitionId,
IgniteBiTuple<ClusterNode, Long>> enlistedGroups) {
- return txCtxMap.compute(txId, (uuid, tuple0) -> {
- if (tuple0 == null) {
- tuple0 = new TxContext(placementDriver); // No writes enlisted.
- }
-
- assert !tuple0.isTxFinishing() : "Transaction is already finished
[id=" + uuid + "].";
-
- tuple0.finishTx(enlistedGroups);
-
- return tuple0;
- });
- }
-
private static CompletableFuture<Void> checkTxOutcome(boolean commit, UUID
txId, TransactionMeta stateMeta) {
if ((stateMeta.txState() == COMMITTED) == commit) {
return nullCompletedFuture();
@@ -810,6 +786,10 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
readOnlyTxFuture.complete(null);
+ UUID txId = txIdAndTimestamp.getTxId();
+
+ resourceCleanupManager.onReadOnlyTransactionFinished(txId);
+
return readOnlyTxFuture;
}
@@ -833,44 +813,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
return allOf(readOnlyTxFutures.toArray(CompletableFuture[]::new));
}
- @Override
- public boolean addInflight(UUID txId) {
- boolean[] res = {true};
-
- txCtxMap.compute(txId, (uuid, tuple) -> {
- if (tuple == null) {
- tuple = new TxContext(placementDriver);
- }
-
- if (tuple.isTxFinishing()) {
- res[0] = false;
- return tuple;
- } else {
- // noinspection NonAtomicOperationOnVolatileField
- tuple.inflights++;
- }
-
- return tuple;
- });
-
- return res[0];
- }
-
- @Override
- public void removeInflight(UUID txId) {
- TxContext tuple = txCtxMap.compute(txId, (uuid, ctx) -> {
- assert ctx != null && ctx.inflights > 0 : ctx;
-
- // noinspection NonAtomicOperationOnVolatileField
- ctx.inflights--;
-
- return ctx;
- });
-
- // Avoid completion under lock.
- tuple.onRemovedInflights();
- }
-
@Override
public HybridClock clock() {
return clock;
@@ -893,7 +835,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
Object result = response.result();
if (result instanceof UUID) {
- removeInflight((UUID) result);
+ transactionInflights.removeInflight((UUID) result);
}
}
@@ -929,7 +871,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
);
} else {
assert
commitTimestamp.compareTo(currentPrimaryReplica.getExpirationTime()) <= 0 :
- IgniteStringFormatter.format(
+ format(
"Commit timestamp is greater than
primary replica expiration timestamp:"
+ " [groupId = {}, commit
timestamp = {}, primary replica expiration timestamp = {}]",
groupId, commitTimestamp,
currentPrimaryReplica.getExpirationTime());
@@ -940,102 +882,6 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
return allOf(verificationFutures);
}
- private static class TxContext {
- volatile long inflights = 0; // Updated under lock.
- private final CompletableFuture<Void> waitRepFut = new
CompletableFuture<>();
- private final PlacementDriver placementDriver;
- volatile CompletableFuture<Void> finishInProgressFuture = null;
- volatile Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>>
enlistedGroups;
-
- private TxContext(PlacementDriver placementDriver) {
- this.placementDriver = placementDriver;
- }
-
- CompletableFuture<Void> performFinish(boolean commit,
Function<Boolean, CompletableFuture<Void>> finishAction) {
- waitReadyToFinish(commit)
- .whenComplete((ignoredReadyToFinish, readyException) ->
finishAction.apply(commit && readyException == null)
- .whenComplete((ignoredFinishActionResult,
finishException) ->
- completeFinishInProgressFuture(commit,
readyException, finishException))
- );
-
- return finishInProgressFuture;
- }
-
- private void completeFinishInProgressFuture(
- boolean commit,
- @Nullable Throwable readyToFinishException,
- @Nullable Throwable finishException
- ) {
- if (readyToFinishException == null) {
- if (finishException == null) {
- finishInProgressFuture.complete(null);
- } else {
-
finishInProgressFuture.completeExceptionally(finishException);
- }
- } else {
- if (commit && readyToFinishException instanceof
PrimaryReplicaExpiredException) {
- finishInProgressFuture.completeExceptionally(new
MismatchingTransactionOutcomeException(
- TX_PRIMARY_REPLICA_EXPIRED_ERR,
- "Failed to commit the transaction.",
- new TransactionResult(ABORTED, null),
- readyToFinishException
- ));
- } else {
-
finishInProgressFuture.completeExceptionally(readyToFinishException);
- }
- }
- }
-
- private CompletableFuture<Void> waitReadyToFinish(boolean commit) {
- if (commit) {
- for (Map.Entry<TablePartitionId, IgniteBiTuple<ClusterNode,
Long>> e : enlistedGroups.entrySet()) {
- ReplicaMeta replicaMeta =
placementDriver.currentLease(e.getKey());
-
- Long enlistmentConsistencyToken = e.getValue().get2();
-
- if (replicaMeta == null ||
!enlistmentConsistencyToken.equals(replicaMeta.getStartTime().longValue())) {
- return failedFuture(new
PrimaryReplicaExpiredException(e.getKey(), enlistmentConsistencyToken, null,
replicaMeta));
- }
- }
-
- return waitNoInflights();
- } else {
- return nullCompletedFuture();
- }
- }
-
- private CompletableFuture<Void> waitNoInflights() {
- if (inflights == 0) {
- waitRepFut.complete(null);
- }
- return waitRepFut;
- }
-
- private void cancelWaitingInflights(TablePartitionId groupId, Long
enlistmentConsistencyToken) {
- waitRepFut.completeExceptionally(new
PrimaryReplicaExpiredException(groupId, enlistmentConsistencyToken, null,
null));
- }
-
- void onRemovedInflights() {
- if (inflights == 0 && finishInProgressFuture != null) {
- waitRepFut.complete(null);
- }
- }
-
- void finishTx(Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>>
enlistedGroups) {
- this.enlistedGroups = enlistedGroups;
- finishInProgressFuture = new CompletableFuture<>();
- }
-
- boolean isTxFinishing() {
- return finishInProgressFuture != null;
- }
-
- @Override
- public String toString() {
- return "TxContext [inflights=" + inflights + ", waitRepFut=" +
waitRepFut + ", finishFut=" + finishInProgressFuture + ']';
- }
- }
-
static class TransactionFailureHandler {
private static final Set<Class<? extends Throwable>> RECOVERABLE =
Set.of(
TimeoutException.class,
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 6383acadcf..538c432335 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -78,6 +78,7 @@ import
org.apache.ignite.internal.tx.impl.PrimaryReplicaExpiredException;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
@@ -143,11 +144,14 @@ public class TxManagerTest extends IgniteAbstractTest {
RemotelyTriggeredResourceRegistry resourceRegistry = new
RemotelyTriggeredResourceRegistry();
+ TransactionInflights transactionInflights = new
TransactionInflights(placementDriver);
+
ResourceCleanupManager cleanupManager = new ResourceCleanupManager(
LOCAL_NODE.name(),
resourceRegistry,
clusterService.topologyService(),
- clusterService.messagingService()
+ clusterService.messagingService(),
+ transactionInflights
);
txManager = new TxManagerImpl(
@@ -161,7 +165,8 @@ public class TxManagerTest extends IgniteAbstractTest {
idleSafeTimePropagationPeriodMsSupplier,
localRwTxCounter,
resourceRegistry,
- cleanupManager
+ cleanupManager,
+ transactionInflights
);
txManager.start();
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
index 0b97d676bd..5850292bfc 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
@@ -36,9 +36,6 @@ class ReadOnlyTransactionImplTest extends
BaseIgniteAbstractTest {
@Mock
private TxManagerImpl txManager;
- @Mock
- private ResourceCleanupManager resourceCleanupManager;
-
@Test
void effectiveSchemaTimestampIsReadTimestamp() {
HybridTimestamp readTimestamp = new HybridClockImpl().now();
@@ -49,8 +46,7 @@ class ReadOnlyTransactionImplTest extends
BaseIgniteAbstractTest {
new HybridTimestampTracker(),
txId,
"localId",
- readTimestamp,
- resourceCleanupManager
+ readTimestamp
);
assertThat(tx.startTimestamp(), is(readTimestamp));