This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c80510d0c2 IGNITE-21618 In-flights for read-only transactions (#3371)
c80510d0c2 is described below

commit c80510d0c225593de02f4d6592ecfcbb9837a535
Author: Denis Chudov <[email protected]>
AuthorDate: Tue Mar 19 12:31:42 2024 +0300

    IGNITE-21618 In-flights for read-only transactions (#3371)
---
 check-rules/spotbugs-excludes.xml                  |  14 +-
 .../apache/ignite/client/fakes/FakeTxManager.java  |  10 -
 .../runner/app/ItIgniteNodeRestartTest.java        |  15 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  15 +-
 .../internal/sql/engine/SqlQueryProcessor.java     |  39 ++-
 .../exec/rel/TableScanNodeExecutionTest.java       |  16 +-
 .../apache/ignite/distributed/ItLockTableTest.java |   7 +-
 ...xDistributedTestSingleNodeNoCleanupMessage.java |   7 +-
 .../rebalance/ItRebalanceDistributedTest.java      |  12 +-
 .../ignite/internal/table/ItColocationTest.java    |  17 +-
 .../internal/table/distributed/TableManager.java   |  11 +-
 .../distributed/storage/InternalTableImpl.java     |  95 ++++++-
 .../distributed/TableManagerRecoveryTest.java      |   9 +-
 .../table/distributed/TableManagerTest.java        |   4 +-
 .../distributed/storage/InternalTableImplTest.java |   7 +-
 .../apache/ignite/distributed/ItTxTestCluster.java |  34 ++-
 .../table/impl/DummyInternalTableImpl.java         |  25 +-
 .../org/apache/ignite/internal/tx/TxManager.java   |  15 -
 .../impl/FinishedReadOnlyTransactionTracker.java   |  52 ++--
 .../internal/tx/impl/ReadOnlyTransactionImpl.java  |   9 +-
 .../internal/tx/impl/ResourceCleanupManager.java   |  17 +-
 .../internal/tx/impl/TransactionInflights.java     | 316 +++++++++++++++++++++
 .../ignite/internal/tx/impl/TxManagerImpl.java     | 198 ++-----------
 .../apache/ignite/internal/tx/TxManagerTest.java   |   9 +-
 .../tx/impl/ReadOnlyTransactionImplTest.java       |   6 +-
 25 files changed, 647 insertions(+), 312 deletions(-)

diff --git a/check-rules/spotbugs-excludes.xml 
b/check-rules/spotbugs-excludes.xml
index eb117abc40..c35cd0fc21 100644
--- a/check-rules/spotbugs-excludes.xml
+++ b/check-rules/spotbugs-excludes.xml
@@ -186,16 +186,22 @@
     <Class 
name="org.apache.ignite.internal.pagememory.persistence.checkpoint.Checkpointer"/>
     <Method name="cancel"/>
   </Match>
+  <Match>
+    <!-- duplicate reported because constants have same value -->
+    <Bug pattern="DB_DUPLICATE_BRANCHES"/>
+    <Source name="AbstractPageMemoryIndexStorage.java"/>
+  </Match>
   <Match>
     <!-- suppressed in code as well -->
     <Bug pattern="VO_VOLATILE_INCREMENT"/>
-    <Class name="org.apache.ignite.internal.tx.impl.TxManagerImpl"/>
+    <Class 
name="org.apache.ignite.internal.tx.impl.TransactionInflights$TxContext"/>
     <Field name="inflights"/>
   </Match>
   <Match>
-    <!-- duplicate reported because constants have same value -->
-    <Bug pattern="DB_DUPLICATE_BRANCHES"/>
-    <Source name="AbstractPageMemoryIndexStorage.java"/>
+    <!-- suppressed in code as well -->
+    <Bug pattern="VO_VOLATILE_DECREMENT"/>
+    <Class 
name="org.apache.ignite.internal.tx.impl.TransactionInflights$TxContext"/>
+    <Field name="inflights"/>
   </Match>
   <!-- end of false-positive exclusions -->
 
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 3fe97ca37f..97acd48bd9 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -215,16 +215,6 @@ public class FakeTxManager implements TxManager {
         return null;
     }
 
-    @Override
-    public boolean addInflight(UUID txId) {
-        return false;
-    }
-
-    @Override
-    public void removeInflight(UUID txId) {
-        // No-op.
-    }
-
     @Override
     public HybridClock clock() {
         return clock;
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index a8afc088eb..6286d7efcb 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -179,6 +179,7 @@ import 
org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
@@ -455,11 +456,14 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
 
         var resourcesRegistry = new RemotelyTriggeredResourceRegistry();
 
+        TransactionInflights transactionInflights = new 
TransactionInflights(placementDriverManager.placementDriver());
+
         ResourceCleanupManager resourceCleanupManager = new 
ResourceCleanupManager(
                 name,
                 resourcesRegistry,
                 clusterSvc.topologyService(),
-                clusterSvc.messagingService()
+                clusterSvc.messagingService(),
+                transactionInflights
         );
 
         var txManager = new TxManagerImpl(
@@ -476,7 +480,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 new TestLocalRwTxCounter(),
                 threadPoolsManager.partitionOperationsExecutor(),
                 resourcesRegistry,
-                resourceCleanupManager
+                resourceCleanupManager,
+                transactionInflights
         );
 
         ConfigurationRegistry clusterConfigRegistry = 
clusterCfgMgr.configurationRegistry();
@@ -574,7 +579,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 resourcesRegistry,
                 rebalanceScheduler,
                 lowWatermark,
-                ForkJoinPool.commonPool()
+                ForkJoinPool.commonPool(),
+                transactionInflights
         );
 
         var indexManager = new IndexManager(
@@ -605,7 +611,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 failureProcessor,
                 placementDriverManager.placementDriver(),
                 
clusterConfigRegistry.getConfiguration(SqlDistributedConfiguration.KEY),
-                
nodeCfgMgr.configurationRegistry().getConfiguration(SqlLocalConfiguration.KEY)
+                
nodeCfgMgr.configurationRegistry().getConfiguration(SqlLocalConfiguration.KEY),
+                transactionInflights
         );
 
         sqlRef.set(new IgniteSqlImpl(name, qryEngine, new 
IgniteTransactionsImpl(txManager, new HybridTimestampTracker())));
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index ff2a4da14e..540ed9b62f 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -201,6 +201,7 @@ import 
org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.vault.VaultManager;
@@ -673,11 +674,14 @@ public class IgniteImpl implements Ignite {
 
         resourcesRegistry = new RemotelyTriggeredResourceRegistry();
 
+        TransactionInflights transactionInflights = new 
TransactionInflights(placementDriverMgr.placementDriver());
+
         resourceCleanupManager = new ResourceCleanupManager(
                 name,
                 resourcesRegistry,
                 clusterSvc.topologyService(),
-                messagingServiceReturningToStorageOperationsPool
+                messagingServiceReturningToStorageOperationsPool,
+                transactionInflights
         );
 
         // TODO: IGNITE-19344 - use nodeId that is validated on join (and 
probably generated differently).
@@ -695,7 +699,8 @@ public class IgniteImpl implements Ignite {
                 indexNodeFinishedRwTransactionsChecker,
                 threadPoolsManager.partitionOperationsExecutor(),
                 resourcesRegistry,
-                resourceCleanupManager
+                resourceCleanupManager,
+                transactionInflights
         );
 
         StorageUpdateConfiguration storageUpdateConfiguration = 
clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY);
@@ -734,7 +739,8 @@ public class IgniteImpl implements Ignite {
                 resourcesRegistry,
                 rebalanceScheduler,
                 lowWatermark,
-                asyncContinuationExecutor
+                asyncContinuationExecutor,
+                transactionInflights
         );
 
         indexManager = new IndexManager(
@@ -776,7 +782,8 @@ public class IgniteImpl implements Ignite {
                 failureProcessor,
                 placementDriverMgr.placementDriver(),
                 
clusterConfigRegistry.getConfiguration(SqlDistributedConfiguration.KEY),
-                nodeConfigRegistry.getConfiguration(SqlLocalConfiguration.KEY)
+                nodeConfigRegistry.getConfiguration(SqlLocalConfiguration.KEY),
+                transactionInflights
         );
 
         sql = new IgniteSqlImpl(name, qryEngine, new 
IgniteTransactionsImpl(txManager, observableTimestampTracker));
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 5508017a73..2a276418de 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.sql.engine;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException;
@@ -26,11 +27,13 @@ import static 
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFI
 import static 
org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.AWAIT_PRIMARY_REPLICA_TIMEOUT;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
 import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR;
+import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
 
 import java.time.ZoneId;
 import java.util.ArrayList;
@@ -120,6 +123,7 @@ import 
org.apache.ignite.internal.systemview.api.SystemViewManager;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
 import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.util.AsyncCursor;
 import org.apache.ignite.internal.util.AsyncWrapper;
 import org.apache.ignite.internal.util.ExceptionUtils;
@@ -130,6 +134,7 @@ import org.apache.ignite.lang.SchemaNotFoundException;
 import org.apache.ignite.sql.ResultSetMetadata;
 import org.apache.ignite.sql.SqlException;
 import org.apache.ignite.tx.IgniteTransactions;
+import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -224,6 +229,8 @@ public class SqlQueryProcessor implements QueryProcessor {
     /** Node SQL configuration. */
     private final SqlLocalConfiguration nodeCfg;
 
+    private final TransactionInflights transactionInflights;
+
     /** Constructor. */
     public SqlQueryProcessor(
             Consumer<LongFunction<CompletableFuture<?>>> registry,
@@ -242,7 +249,8 @@ public class SqlQueryProcessor implements QueryProcessor {
             FailureProcessor failureProcessor,
             PlacementDriver placementDriver,
             SqlDistributedConfiguration clusterCfg,
-            SqlLocalConfiguration nodeCfg
+            SqlLocalConfiguration nodeCfg,
+            TransactionInflights transactionInflights
     ) {
         this.clusterSrvc = clusterSrvc;
         this.logicalTopologyService = logicalTopologyService;
@@ -260,6 +268,7 @@ public class SqlQueryProcessor implements QueryProcessor {
         this.placementDriver = placementDriver;
         this.clusterCfg = clusterCfg;
         this.nodeCfg = nodeCfg;
+        this.transactionInflights = transactionInflights;
 
         sqlSchemaManager = new SqlSchemaManagerImpl(
                 catalogManager,
@@ -320,7 +329,7 @@ public class SqlQueryProcessor implements QueryProcessor {
             @Override
             public CompletableFuture<ExecutionTarget> 
forTable(ExecutionTargetFactory factory, IgniteTable table) {
                 return primaryReplicas(table)
-                        .thenApply(replicas -> factory.partitioned(replicas));
+                        .thenApply(factory::partitioned);
             }
 
             @Override
@@ -328,7 +337,7 @@ public class SqlQueryProcessor implements QueryProcessor {
                 List<String> nodes = 
systemViewManager.owningNodes(view.name());
 
                 if (nullOrEmpty(nodes)) {
-                    return CompletableFuture.failedFuture(
+                    return failedFuture(
                             new SqlException(Sql.MAPPING_ERR, format("The view 
with name '{}' could not be found on"
                                     + " any active nodes in the cluster", 
view.name()))
                     );
@@ -568,7 +577,27 @@ public class SqlQueryProcessor implements QueryProcessor {
 
             QueryTransactionWrapper txWrapper = 
txCtx.getOrStartImplicit(result.queryType());
 
-            return executeParsedStatement(schemaName, result, txWrapper, 
queryCancel, timeZoneId, params, null);
+            InternalTransaction tx = txWrapper.unwrap();
+
+            // Adding inflights only for read-only transactions.
+            if (tx.isReadOnly() && !transactionInflights.addInflight(tx.id(), 
tx.isReadOnly())) {
+                return failedFuture(new TransactionException(
+                        TX_ALREADY_FINISHED_ERR, format("Transaction is 
already finished [tx={}]", tx)
+                ));
+            }
+
+            return executeParsedStatement(schemaName, result, txWrapper, 
queryCancel, timeZoneId, params, null)
+                    .handle((executionResult, e) -> {
+                        if (tx.isReadOnly()) {
+                            
transactionInflights.removeInflight(txWrapper.unwrap().id());
+                        }
+
+                        if (e != null) {
+                            sneakyThrow(e);
+                        }
+
+                        return executionResult;
+                    });
         });
 
         // TODO IGNITE-20078 Improve (or remove) CancellationException 
handling.
@@ -672,7 +701,7 @@ public class SqlQueryProcessor implements QueryProcessor {
                 return schema;
             });
         } catch (Throwable t) {
-            return CompletableFuture.failedFuture(t);
+            return failedFuture(t);
         }
     }
 
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index b24e441924..6aa22be45f 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -75,6 +75,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
@@ -147,11 +148,16 @@ public class TableScanNodeExecutionTest extends 
AbstractExecutionTest<Object[]>
 
             RemotelyTriggeredResourceRegistry resourcesRegistry = new 
RemotelyTriggeredResourceRegistry();
 
+            PlacementDriver placementDriver = new 
TestPlacementDriver(leaseholder, leaseholder);
+
+            TransactionInflights transactionInflights = new 
TransactionInflights(placementDriver);
+
             ResourceCleanupManager resourceCleanupManager = new 
ResourceCleanupManager(
                     leaseholder,
                     resourcesRegistry,
                     clusterService.topologyService(),
-                    clusterService.messagingService()
+                    clusterService.messagingService(),
+                    transactionInflights
             );
 
             TxManagerImpl txManager = new TxManagerImpl(
@@ -161,11 +167,12 @@ public class TableScanNodeExecutionTest extends 
AbstractExecutionTest<Object[]>
                     new HeapLockManager(),
                     new HybridClockImpl(),
                     new TransactionIdGenerator(0xdeadbeef),
-                    new TestPlacementDriver(leaseholder, leaseholder),
+                    placementDriver,
                     () -> 
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
                     new TestLocalRwTxCounter(),
                     resourcesRegistry,
-                    resourceCleanupManager
+                    resourceCleanupManager,
+                    transactionInflights
             );
 
             txManager.start();
@@ -245,7 +252,8 @@ public class TableScanNodeExecutionTest extends 
AbstractExecutionTest<Object[]>
                             PART_CNT,
                             Int2ObjectMaps.singleton(0, 
mock(RaftGroupService.class)),
                             new 
SingleClusterNodeResolver(mock(ClusterNode.class))
-                    )
+                    ),
+                    mock(TransactionInflights.class)
             );
             this.dataAmount = dataAmount;
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
index 99296eb7de..f1470e47fd 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
@@ -47,6 +47,7 @@ import 
org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
 import org.apache.ignite.internal.type.NativeTypes;
@@ -133,7 +134,8 @@ public class ItLockTableTest extends IgniteAbstractTest {
                     ClusterNode node,
                     PlacementDriver placementDriver,
                     RemotelyTriggeredResourceRegistry resourcesRegistry,
-                    ResourceCleanupManager resourceCleanupManager
+                    ResourceCleanupManager resourceCleanupManager,
+                    TransactionInflights transactionInflights
             ) {
                 return new TxManagerImpl(
                         txConfiguration,
@@ -150,7 +152,8 @@ public class ItLockTableTest extends IgniteAbstractTest {
                         () -> 
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
                         new TestLocalRwTxCounter(),
                         resourcesRegistry,
-                        resourceCleanupManager
+                        resourceCleanupManager,
+                        transactionInflights
                 );
             }
         };
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index 389259130e..7316ce78ab 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
@@ -116,7 +117,8 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage 
extends TxAbstractTes
                     ClusterNode node,
                     PlacementDriver placementDriver,
                     RemotelyTriggeredResourceRegistry resourcesRegistry,
-                    ResourceCleanupManager resourceCleanupManager
+                    ResourceCleanupManager resourceCleanupManager,
+                    TransactionInflights transactionInflights
             ) {
                 return new TxManagerImpl(
                         txConfiguration,
@@ -129,7 +131,8 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage 
extends TxAbstractTes
                         () -> 
DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
                         new TestLocalRwTxCounter(),
                         resourcesRegistry,
-                        resourceCleanupManager
+                        resourceCleanupManager,
+                        transactionInflights
                 ) {
                     @Override
                     public CompletableFuture<Void> 
executeWriteIntentSwitchAsync(Runnable runnable) {
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index f2ee075757..c1e7d69085 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -188,6 +188,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
@@ -1090,11 +1091,14 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
             var resourcesRegistry = new RemotelyTriggeredResourceRegistry();
 
+            TransactionInflights transactionInflights = new 
TransactionInflights(placementDriver);
+
             ResourceCleanupManager resourceCleanupManager = new 
ResourceCleanupManager(
                     name,
                     resourcesRegistry,
                     clusterService.topologyService(),
-                    clusterService.messagingService()
+                    clusterService.messagingService(),
+                    transactionInflights
             );
 
             txManager = new TxManagerImpl(
@@ -1108,7 +1112,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     partitionIdleSafeTimePropagationPeriodMsSupplier,
                     new TestLocalRwTxCounter(),
                     resourcesRegistry,
-                    resourceCleanupManager
+                    resourceCleanupManager,
+                    transactionInflights
             );
 
             cfgStorage = new DistributedConfigurationStorage("test", 
metaStorageManager);
@@ -1216,7 +1221,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     resourcesRegistry,
                     rebalanceScheduler,
                     lowWatermark,
-                    ForkJoinPool.commonPool()
+                    ForkJoinPool.commonPool(),
+                    transactionInflights
             ) {
                 @Override
                 protected TxStateTableStorage createTxStateTableStorage(
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 4d321081e6..d599540c39 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
@@ -100,6 +101,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import 
org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
 import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
@@ -166,11 +168,16 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
 
         RemotelyTriggeredResourceRegistry resourcesRegistry = new 
RemotelyTriggeredResourceRegistry();
 
+        PlacementDriver placementDriver = new TestPlacementDriver(clusterNode);
+
+        TransactionInflights transactionInflights = new 
TransactionInflights(placementDriver);
+
         ResourceCleanupManager resourceCleanupManager = new 
ResourceCleanupManager(
                 clusterNode.name(),
                 resourcesRegistry,
                 clusterService.topologyService(),
-                clusterService.messagingService()
+                clusterService.messagingService(),
+                transactionInflights
         );
 
         txManager = new TxManagerImpl(
@@ -180,11 +187,12 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
                 new HeapLockManager(),
                 new HybridClockImpl(),
                 new TransactionIdGenerator(0xdeadbeef),
-                new TestPlacementDriver(clusterNode),
+                placementDriver,
                 () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
                 new TestLocalRwTxCounter(),
                 resourcesRegistry,
-                resourceCleanupManager
+                resourceCleanupManager,
+                transactionInflights
         ) {
             @Override
             public CompletableFuture<Void> finish(
@@ -292,7 +300,8 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
                 new HybridClockImpl(),
                 observableTimestampTracker,
                 new TestPlacementDriver(clusterNode),
-                new TableRaftServiceImpl("PUBLIC.TEST", PARTS, partRafts, new 
SingleClusterNodeResolver(clusterNode))
+                new TableRaftServiceImpl("PUBLIC.TEST", PARTS, partRafts, new 
SingleClusterNodeResolver(clusterNode)),
+                transactionInflights
         );
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index e8d3d12ae0..216de7595c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -198,6 +198,7 @@ import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxMessageSender;
 import 
org.apache.ignite.internal.tx.storage.state.ThreadAssertingTxStateTableStorage;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
@@ -398,6 +399,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
     private final Executor asyncContinuationExecutor;
 
+    private final TransactionInflights transactionInflights;
+
     /**
      * Creates a new table manager.
      *
@@ -425,6 +428,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
      * @param lowWatermark Low watermark.
      * @param asyncContinuationExecutor Executor to which execution will be 
resubmitted when leaving asynchronous public API endpoints
      *     (so as to prevent the user from stealing Ignite threads).
+     * @param transactionInflights Transaction inflights.
      */
     public TableManager(
             String nodeName,
@@ -458,7 +462,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
             RemotelyTriggeredResourceRegistry 
remotelyTriggeredResourceRegistry,
             ScheduledExecutorService rebalanceScheduler,
             LowWatermark lowWatermark,
-            Executor asyncContinuationExecutor
+            Executor asyncContinuationExecutor,
+            TransactionInflights transactionInflights
     ) {
         this.topologyService = topologyService;
         this.raftMgr = raftMgr;
@@ -484,6 +489,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         this.rebalanceScheduler = rebalanceScheduler;
         this.lowWatermark = lowWatermark;
         this.asyncContinuationExecutor = asyncContinuationExecutor;
+        this.transactionInflights = transactionInflights;
 
         this.executorInclinedSchemaSyncService = new 
ExecutorInclinedSchemaSyncService(schemaSyncService, 
partitionOperationsExecutor);
         this.executorInclinedPlacementDriver = new 
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
@@ -1289,7 +1295,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 clock,
                 observableTimestampTracker,
                 executorInclinedPlacementDriver,
-                tableRaftService
+                tableRaftService,
+                transactionInflights
         );
 
         var table = new TableImpl(
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index aaa32b6284..68446acc84 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -97,6 +97,7 @@ import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.LockException;
 import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.internal.util.ExceptionUtils;
@@ -120,6 +121,10 @@ public class InternalTableImpl implements InternalTable {
     /** Primary replica await timeout. */
     public static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 30;
 
+    /** Default no-op implementation to avoid unnecessary allocations. */
+    private static final ReadWriteInflightBatchRequestTracker 
READ_WRITE_INFLIGHT_BATCH_REQUEST_TRACKER =
+            new ReadWriteInflightBatchRequestTracker();
+
     /** Partitions. */
     private final int partitions;
 
@@ -135,6 +140,8 @@ public class InternalTableImpl implements InternalTable {
     /** Transactional manager. */
     protected final TxManager txManager;
 
+    private final TransactionInflights transactionInflights;
+
     /** Storage for table data. */
     private final MvTableStorage tableStorage;
 
@@ -182,6 +189,7 @@ public class InternalTableImpl implements InternalTable {
      * @param clock A hybrid logical clock.
      * @param placementDriver Placement driver.
      * @param tableRaftService Table raft service.
+     * @param transactionInflights Transaction inflights.
      */
     public InternalTableImpl(
             String tableName,
@@ -195,7 +203,8 @@ public class InternalTableImpl implements InternalTable {
             HybridClock clock,
             HybridTimestampTracker observableTimestampTracker,
             PlacementDriver placementDriver,
-            TableRaftServiceImpl tableRaftService
+            TableRaftServiceImpl tableRaftService,
+            TransactionInflights transactionInflights
     ) {
         this.tableName = tableName;
         this.tableId = tableId;
@@ -210,6 +219,7 @@ public class InternalTableImpl implements InternalTable {
         this.observableTimestampTracker = observableTimestampTracker;
         this.placementDriver = placementDriver;
         this.tableRaftService = tableRaftService;
+        this.transactionInflights = transactionInflights;
     }
 
     /** {@inheritDoc} */
@@ -532,6 +542,8 @@ public class InternalTableImpl implements InternalTable {
             @Nullable BiPredicate<R, ReplicaRequest> noWriteChecker,
             boolean retryOnLockConflict
     ) {
+        assert !tx.isReadOnly() : format("Tracking invoke is available only 
for read-write transactions [tx={}].", tx);
+
         ReplicaRequest request = 
mapFunc.apply(primaryReplicaAndConsistencyToken.get2());
 
         boolean write = request instanceof SingleRowReplicaRequest && 
((SingleRowReplicaRequest) request).requestType() != RW_GET
@@ -542,7 +554,7 @@ public class InternalTableImpl implements InternalTable {
 
         if (write && !full) {
             // Track only write requests from explicit transactions.
-            if (!txManager.addInflight(tx.id())) {
+            if (!transactionInflights.addInflight(tx.id(), false)) {
                 return failedFuture(
                         new TransactionException(TX_ALREADY_FINISHED_ERR, 
format(
                                 "Transaction is already finished 
[tableName={}, partId={}, txState={}].",
@@ -557,13 +569,13 @@ public class InternalTableImpl implements InternalTable {
 
                 // Remove inflight if no replication was scheduled, otherwise 
inflight will be removed by delayed response.
                 if (noWriteChecker.test(res, request)) {
-                    txManager.removeInflight(tx.id());
+                    transactionInflights.removeInflight(tx.id());
                 }
 
                 return res;
             }).exceptionally(e -> {
                 if (retryOnLockConflict && e.getCause() instanceof 
LockException) {
-                    txManager.removeInflight(tx.id()); // Will be retried.
+                    transactionInflights.removeInflight(tx.id()); // Will be 
retried.
                 }
 
                 ExceptionUtils.sneakyThrow(e);
@@ -1469,7 +1481,8 @@ public class InternalTableImpl implements InternalTable {
                     return replicaSvc.invoke(recipientNode, request);
                 },
                 // TODO: IGNITE-17666 Close cursor tx finish.
-                (intentionallyClose, fut) -> completeScan(txId, 
tablePartitionId, fut, recipientNode, intentionallyClose)
+                (intentionallyClose, fut) -> completeScan(txId, 
tablePartitionId, fut, recipientNode, intentionallyClose),
+                new ReadOnlyInflightBatchRequestTracker(transactionInflights, 
txId)
         );
     }
 
@@ -1530,7 +1543,8 @@ public class InternalTableImpl implements InternalTable {
                     }
 
                     return postEnlist(opFut, intentionallyClose, actualTx, 
implicit && !intentionallyClose);
-                }
+                },
+                READ_WRITE_INFLIGHT_BATCH_REQUEST_TRACKER
         );
     }
 
@@ -1572,7 +1586,9 @@ public class InternalTableImpl implements InternalTable {
                     return replicaSvc.invoke(recipient.node(), request);
                 },
                 // TODO: IGNITE-17666 Close cursor tx finish.
-                (intentionallyClose, fut) -> completeScan(txId, 
tablePartitionId, fut, recipient.node(), intentionallyClose));
+                (intentionallyClose, fut) -> completeScan(txId, 
tablePartitionId, fut, recipient.node(), intentionallyClose),
+                READ_WRITE_INFLIGHT_BATCH_REQUEST_TRACKER
+        );
     }
 
     /**
@@ -1801,19 +1817,24 @@ public class InternalTableImpl implements InternalTable 
{
         /** True when the publisher has a subscriber, false otherwise. */
         private final AtomicBoolean subscribed;
 
+        private final InflightBatchRequestTracker inflightBatchRequestTracker;
+
         /**
          * The constructor.
          *
          * @param retrieveBatch Closure that gets a new batch from the remote 
replica.
          * @param onClose The closure will be applied when {@link 
Subscription#cancel} is invoked directly or the cursor is
          *         finished.
+         * @param inflightBatchRequestTracker {@link 
InflightBatchRequestTracker} to track betch requests completion.
          */
         PartitionScanPublisher(
                 BiFunction<Long, Integer, 
CompletableFuture<Collection<BinaryRow>>> retrieveBatch,
-                BiFunction<Boolean, CompletableFuture<Long>, 
CompletableFuture<Void>> onClose
+                BiFunction<Boolean, CompletableFuture<Long>, 
CompletableFuture<Void>> onClose,
+                InflightBatchRequestTracker inflightBatchRequestTracker
         ) {
             this.retrieveBatch = retrieveBatch;
             this.onClose = onClose;
+            this.inflightBatchRequestTracker = inflightBatchRequestTracker;
 
             this.subscribed = new AtomicBoolean(false);
         }
@@ -1926,10 +1947,14 @@ public class InternalTableImpl implements InternalTable 
{
                     return;
                 }
 
+                inflightBatchRequestTracker.onRequestBegin();
+
                 retrieveBatch.apply(scanId, n).thenAccept(binaryRows -> {
                     assert binaryRows != null;
                     assert binaryRows.size() <= n : "Rows more then requested 
" + binaryRows.size() + " " + n;
 
+                    inflightBatchRequestTracker.onRequestEnd();
+
                     binaryRows.forEach(subscriber::onNext);
 
                     if (binaryRows.size() < n) {
@@ -1942,6 +1967,8 @@ public class InternalTableImpl implements InternalTable {
                         }
                     }
                 }).exceptionally(t -> {
+                    inflightBatchRequestTracker.onRequestEnd();
+
                     cancel(t, false);
 
                     return null;
@@ -1950,6 +1977,58 @@ public class InternalTableImpl implements InternalTable {
         }
     }
 
+    /**
+     * Created for {@code PartitionScanSubscription} to track inflight batch 
requests.
+     */
+    private interface InflightBatchRequestTracker {
+        void onRequestBegin();
+
+        void onRequestEnd();
+    }
+
+    /**
+     * This is, in fact, no-op {@code InflightBatchRequestTracker} because 
tracking batch requests for read-write transactions is not
+     * needed.
+     */
+    private static class ReadWriteInflightBatchRequestTracker implements 
InflightBatchRequestTracker {
+        @Override
+        public void onRequestBegin() {
+            // No-op.
+        }
+
+        @Override
+        public void onRequestEnd() {
+            // No-op.
+        }
+    }
+
+    private static class ReadOnlyInflightBatchRequestTracker implements 
InflightBatchRequestTracker {
+        private final TransactionInflights transactionInflights;
+
+        private final UUID txId;
+
+        ReadOnlyInflightBatchRequestTracker(TransactionInflights 
transactionInflights, UUID txId) {
+            this.transactionInflights = transactionInflights;
+            this.txId = txId;
+        }
+
+        @Override
+        public void onRequestBegin() {
+            // Track read only requests which are able to create cursors.
+            if (!transactionInflights.addInflight(txId, true)) {
+                throw new TransactionException(TX_ALREADY_FINISHED_ERR, format(
+                        "Transaction is already finished () [txId={}, 
readOnly=true].",
+                        txId
+                ));
+            }
+        }
+
+        @Override
+        public void onRequestEnd() {
+            transactionInflights.removeInflight(txId);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override
     public void close() {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index f6346539d8..3b4c8d4fb6 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -74,6 +74,7 @@ import 
org.apache.ignite.internal.metastorage.server.TestRocksDbKeyValueStorage;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.Peer;
@@ -104,6 +105,7 @@ import 
org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.network.ClusterNode;
@@ -275,6 +277,8 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
 
         Consumer<LongFunction<CompletableFuture<?>>> revisionUpdater = c -> 
metaStorageManager.registerRevisionUpdateListener(c::apply);
 
+        PlacementDriver placementDriver = new TestPlacementDriver(node);
+
         lowWatermark = new TestLowWatermark();
         lowWatermark.update(savedWatermark);
         tableManager = new TableManager(
@@ -304,12 +308,13 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
                 new AlwaysSyncedSchemaSyncService(),
                 catalogManager,
                 new HybridTimestampTracker(),
-                new TestPlacementDriver(node),
+                placementDriver,
                 () -> mock(IgniteSql.class),
                 new RemotelyTriggeredResourceRegistry(),
                 mock(ScheduledExecutorService.class),
                 lowWatermark,
-                ForkJoinPool.commonPool()
+                ForkJoinPool.commonPool(),
+                new TransactionInflights(placementDriver)
         ) {
 
             @Override
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 9dee6283ab..00a4ce4480 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -127,6 +127,7 @@ import 
org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.util.CursorUtils;
@@ -780,7 +781,8 @@ public class TableManagerTest extends IgniteAbstractTest {
                 new RemotelyTriggeredResourceRegistry(),
                 mock(ScheduledExecutorService.class),
                 lowWatermark,
-                ForkJoinPool.commonPool()
+                ForkJoinPool.commonPool(),
+                mock(TransactionInflights.class)
         ) {
 
             @Override
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index 5bb8b89b96..b8871d1007 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -46,6 +46,7 @@ import 
org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
@@ -70,7 +71,8 @@ public class InternalTableImplTest extends 
BaseIgniteAbstractTest {
                 mock(HybridClock.class),
                 new HybridTimestampTracker(),
                 mock(PlacementDriver.class),
-                new TableRaftServiceImpl("test", 1, Int2ObjectMaps.emptyMap(), 
new SingleClusterNodeResolver(mock(ClusterNode.class)))
+                new TableRaftServiceImpl("test", 1, Int2ObjectMaps.emptyMap(), 
new SingleClusterNodeResolver(mock(ClusterNode.class))),
+                mock(TransactionInflights.class)
         );
 
         // Let's check the empty table.
@@ -116,7 +118,8 @@ public class InternalTableImplTest extends 
BaseIgniteAbstractTest {
                 mock(HybridClock.class),
                 new HybridTimestampTracker(),
                 mock(PlacementDriver.class),
-                new TableRaftServiceImpl("test", 3, Int2ObjectMaps.emptyMap(), 
new SingleClusterNodeResolver(mock(ClusterNode.class)))
+                new TableRaftServiceImpl("test", 3, Int2ObjectMaps.emptyMap(), 
new SingleClusterNodeResolver(mock(ClusterNode.class))),
+                mock(TransactionInflights.class)
         );
 
         List<BinaryRowEx> originalRows = List.of(
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 7b9d20e114..ee9fa70414 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -136,6 +136,7 @@ import 
org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.impl.TxMessageSender;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
@@ -202,8 +203,12 @@ public class ItTxTestCluster {
 
     protected Map<String, TxManager> txManagers;
 
+    protected Map<String, TransactionInflights> txInflights;
+
     protected Map<String, RemotelyTriggeredResourceRegistry> cursorRegistries;
 
+    private TransactionInflights clientTransactionInflights;
+
     protected TxManager clientTxManager;
 
     protected TransactionStateResolver clientTxStateResolver;
@@ -334,6 +339,7 @@ public class ItTxTestCluster {
         replicaManagers = new HashMap<>(nodes);
         replicaServices = new HashMap<>(nodes);
         txManagers = new HashMap<>(nodes);
+        txInflights = new HashMap<>(nodes);
         cursorRegistries = new HashMap<>(nodes);
         txStateStorages = new HashMap<>(nodes);
 
@@ -398,11 +404,16 @@ public class ItTxTestCluster {
 
             var resourcesRegistry = new RemotelyTriggeredResourceRegistry();
 
+            TransactionInflights transactionInflights = new 
TransactionInflights(placementDriver);
+
+            txInflights.put(node.name(), transactionInflights);
+
             ResourceCleanupManager resourceCleanupManager = new 
ResourceCleanupManager(
                     node.name(),
                     resourcesRegistry,
                     clusterService.topologyService(),
-                    clusterService.messagingService()
+                    clusterService.messagingService(),
+                    transactionInflights
             );
 
             cursorRegistries.put(node.name(), resourcesRegistry);
@@ -415,7 +426,8 @@ public class ItTxTestCluster {
                     node,
                     placementDriver,
                     resourcesRegistry,
-                    resourceCleanupManager
+                    resourceCleanupManager,
+                    transactionInflights
             );
 
             txMgr.start();
@@ -436,6 +448,7 @@ public class ItTxTestCluster {
         } else {
             // Collocated mode.
             clientTxManager = txManagers.get(localNodeName);
+            clientTransactionInflights = txInflights.get(localNodeName);
         }
 
         igniteTransactions = new IgniteTransactionsImpl(clientTxManager, 
timestampTracker);
@@ -451,7 +464,8 @@ public class ItTxTestCluster {
             ClusterNode node,
             PlacementDriver placementDriver,
             RemotelyTriggeredResourceRegistry resourcesRegistry,
-            ResourceCleanupManager resourceCleanupManager
+            ResourceCleanupManager resourceCleanupManager,
+            TransactionInflights transactionInflights
     ) {
         return new TxManagerImpl(
                 node.name(),
@@ -467,7 +481,8 @@ public class ItTxTestCluster {
                 new TestLocalRwTxCounter(),
                 partitionOperationsExecutor,
                 resourcesRegistry,
-                resourceCleanupManager
+                resourceCleanupManager,
+                transactionInflights
         );
     }
 
@@ -702,7 +717,8 @@ public class ItTxTestCluster {
                         startClient ? clientClock : clocks.get(localNodeName),
                         timestampTracker,
                         placementDriver,
-                        new TableRaftServiceImpl(tableName, 1, clients, 
nodeResolver)
+                        new TableRaftServiceImpl(tableName, 1, clients, 
nodeResolver),
+                        clientTransactionInflights
                 ),
                 new DummySchemaManagerImpl(schemaDescriptor),
                 clientTxManager.lockManager(),
@@ -924,11 +940,14 @@ public class ItTxTestCluster {
     private void initializeClientTxComponents() {
         RemotelyTriggeredResourceRegistry resourceRegistry = new 
RemotelyTriggeredResourceRegistry();
 
+        clientTransactionInflights = new TransactionInflights(placementDriver);
+
         ResourceCleanupManager resourceCleanupManager = new 
ResourceCleanupManager(
                 "client",
                 resourceRegistry,
                 client.topologyService(),
-                client.messagingService()
+                client.messagingService(),
+                clientTransactionInflights
         );
 
         clientTxManager = new TxManagerImpl(
@@ -945,7 +964,8 @@ public class ItTxTestCluster {
                 new TestLocalRwTxCounter(),
                 partitionOperationsExecutor,
                 resourceRegistry,
-                resourceCleanupManager
+                resourceCleanupManager,
+                clientTransactionInflights
         );
 
         clientTxStateResolver = new TransactionStateResolver(
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 9c26b73ece..c9bba46a80 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -97,6 +97,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import 
org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
 import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
@@ -164,14 +165,9 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
         this(
                 replicaSvc,
                 new TestMvPartitionStorage(0),
-                false,
-                null,
                 schema,
-                new HybridTimestampTracker(),
-                new TestPlacementDriver(LOCAL_NODE),
-                storageUpdateConfiguration,
                 txConfiguration,
-                new RemotelyTriggeredResourceRegistry()
+                storageUpdateConfiguration
         );
     }
 
@@ -201,7 +197,8 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 new TestPlacementDriver(LOCAL_NODE),
                 storageUpdateConfiguration,
                 txConfiguration,
-                new RemotelyTriggeredResourceRegistry()
+                new RemotelyTriggeredResourceRegistry(),
+                new TransactionInflights(new TestPlacementDriver(LOCAL_NODE))
         );
     }
 
@@ -228,7 +225,8 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
             PlacementDriver placementDriver,
             StorageUpdateConfiguration storageUpdateConfiguration,
             TransactionConfiguration txConfiguration,
-            RemotelyTriggeredResourceRegistry resourcesRegistry
+            RemotelyTriggeredResourceRegistry resourcesRegistry,
+            TransactionInflights transactionInflights
     ) {
         super(
                 "test",
@@ -247,7 +245,8 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                         1,
                         Int2ObjectMaps.singleton(PART_ID, 
mock(RaftGroupService.class)),
                         new SingleClusterNodeResolver(LOCAL_NODE)
-                )
+                ),
+                transactionInflights
         );
 
         RaftGroupService svc = 
tableRaftService().partitionRaftGroupService(PART_ID);
@@ -460,11 +459,14 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
         when(clusterService.messagingService()).thenReturn(new 
DummyMessagingService(LOCAL_NODE));
         when(clusterService.topologyService()).thenReturn(topologyService);
 
+        TransactionInflights transactionInflights = new 
TransactionInflights(placementDriver);
+
         ResourceCleanupManager resourceCleanupManager = new 
ResourceCleanupManager(
                 LOCAL_NODE.name(),
                 resourcesRegistry,
                 clusterService.topologyService(),
-                clusterService.messagingService()
+                clusterService.messagingService(),
+                transactionInflights
         );
 
         var txManager = new TxManagerImpl(
@@ -478,7 +480,8 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
                 new TestLocalRwTxCounter(),
                 resourcesRegistry,
-                resourceCleanupManager
+                resourceCleanupManager,
+                transactionInflights
         );
 
         txManager.start();
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index cbbd1516cf..ad41534ffe 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -191,21 +191,6 @@ public interface TxManager extends IgniteComponent {
      */
     CompletableFuture<Void> updateLowWatermark(HybridTimestamp 
newLowWatermark);
 
-    /**
-     * Registers the infligh update for a transaction.
-     *
-     * @param txId The transaction id.
-     * @return {@code True} if the inflight was registered. The update must be 
failed on false.
-     */
-    boolean addInflight(UUID txId);
-
-    /**
-     * Unregisters the inflight for a transaction.
-     *
-     * @param txId The transction id
-     */
-    void removeInflight(UUID txId);
-
     /** Returns the node's hybrid clock. */
     HybridClock clock();
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedReadOnlyTransactionTracker.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedReadOnlyTransactionTracker.java
index b751d7faf3..bac0426475 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedReadOnlyTransactionTracker.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedReadOnlyTransactionTracker.java
@@ -18,13 +18,10 @@
 package org.apache.ignite.internal.tx.impl;
 
 import static java.util.concurrent.CompletableFuture.allOf;
-import static java.util.stream.Collectors.toSet;
 
 import java.util.Collection;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.tx.message.FinishedTransactionsBatchMessage;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
@@ -35,60 +32,59 @@ import org.apache.ignite.network.TopologyService;
  * Keeps track of all finished RO transactions.
  */
 public class FinishedReadOnlyTransactionTracker {
-
-    private static final int MAX_FINISHED_TRANSACTIONS_IN_BATCH = 10_000;
-
     /** Tx messages factory. */
     private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
 
-    /** A collection of finished read only transactions ordered by the time 
when when they were finished. */
-    private final Collection<UUID> finishedTransactions = new 
LinkedBlockingQueue<>();
-
     /** Topology service. */
     private final TopologyService topologyService;
 
     /** Messaging service. */
     private final MessagingService messagingService;
 
+    /** Transaction inflights. */
+    private final TransactionInflights transactionInflights;
+
     /**
      * Constructor.
      *
      * @param topologyService Topology service.
      * @param messagingService Messaging service.
+     * @param transactionInflights Transaction inflights.
      */
-    public FinishedReadOnlyTransactionTracker(TopologyService topologyService, 
MessagingService messagingService) {
+    public FinishedReadOnlyTransactionTracker(
+            TopologyService topologyService,
+            MessagingService messagingService,
+            TransactionInflights transactionInflights
+    ) {
         this.topologyService = topologyService;
         this.messagingService = messagingService;
+        this.transactionInflights = transactionInflights;
     }
 
     /**
      * Send close cursors batch message to all cluster nodes.
      */
     public void broadcastClosedTransactions() {
-        if (finishedTransactions.isEmpty()) {
-            return;
+        Collection<UUID> txToSend = 
transactionInflights.finishedReadOnlyTransactions();
+
+        if (!txToSend.isEmpty()) {
+            FinishedTransactionsBatchMessage message = 
FACTORY.finishedTransactionsBatchMessage()
+                    .transactions(txToSend)
+                    .build();
+
+            CompletableFuture<?>[] messages = topologyService.allMembers()
+                    .stream()
+                    .map(clusterNode -> sendCursorCleanupCommand(clusterNode, 
message))
+                    .toArray(CompletableFuture[]::new);
+            allOf(messages).thenRun(() -> 
transactionInflights.removeTxContexts(txToSend));
         }
-
-        Set<UUID> txToSend = finishedTransactions.stream()
-                .limit(MAX_FINISHED_TRANSACTIONS_IN_BATCH)
-                .collect(toSet());
-
-        FinishedTransactionsBatchMessage message = 
FACTORY.finishedTransactionsBatchMessage()
-                .transactions(txToSend)
-                .build();
-
-        CompletableFuture<?>[] messages = topologyService.allMembers()
-                .stream()
-                .map(clusterNode -> sendCursorCleanupCommand(clusterNode, 
message))
-                .toArray(CompletableFuture[]::new);
-        allOf(messages).thenRun(() -> 
finishedTransactions.removeAll(txToSend));
     }
 
     private CompletableFuture<Void> sendCursorCleanupCommand(ClusterNode node, 
FinishedTransactionsBatchMessage message) {
         return messagingService.send(node, message);
     }
 
-    public void onTransactionFinished(UUID id) {
-        finishedTransactions.add(id);
+    void onTransactionFinished(UUID id) {
+        transactionInflights.markReadOnlyTxFinished(id);
     }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
index b73987e238..34affae0b7 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
@@ -41,9 +41,6 @@ class ReadOnlyTransactionImpl extends 
IgniteAbstractTransactionImpl {
     /** The tracker is used to track an observable timestamp. */
     private final HybridTimestampTracker observableTsTracker;
 
-    /** Cleanup manager for tracking closed transactions. */
-    private final ResourceCleanupManager resourceCleanupManager;
-
     /**
      * The constructor.
      *
@@ -58,14 +55,12 @@ class ReadOnlyTransactionImpl extends 
IgniteAbstractTransactionImpl {
             HybridTimestampTracker observableTsTracker,
             UUID id,
             String txCoordinatorId,
-            HybridTimestamp readTimestamp,
-            ResourceCleanupManager resourceCleanupManager
+            HybridTimestamp readTimestamp
     ) {
         super(txManager, id, txCoordinatorId);
 
         this.readTimestamp = readTimestamp;
         this.observableTsTracker = observableTsTracker;
-        this.resourceCleanupManager = resourceCleanupManager;
     }
 
     @Override
@@ -119,8 +114,6 @@ class ReadOnlyTransactionImpl extends 
IgniteAbstractTransactionImpl {
             return nullCompletedFuture();
         }
 
-        resourceCleanupManager.onTransactionFinished(id());
-
         observableTsTracker.update(executionTimestamp);
 
         return ((TxManagerImpl) 
txManager).completeReadOnlyTransactionFuture(new 
TxIdAndTimestamp(readTimestamp, id()));
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceCleanupManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceCleanupManager.java
index 55cce511c6..cb51853794 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceCleanupManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceCleanupManager.java
@@ -74,12 +74,14 @@ public class ResourceCleanupManager implements 
IgniteComponent {
      * @param resourceRegistry Resources registry.
      * @param topologyService Topology service.
      * @param messagingService Messaging service.
+     * @param transactionInflights Transaction inflights.
      */
     public ResourceCleanupManager(
             String nodeName,
             RemotelyTriggeredResourceRegistry resourceRegistry,
             TopologyService topologyService,
-            MessagingService messagingService
+            MessagingService messagingService,
+            TransactionInflights transactionInflights
     ) {
         this.resourceRegistry = resourceRegistry;
         this.clusterNodeResolver = topologyService;
@@ -87,7 +89,11 @@ public class ResourceCleanupManager implements 
IgniteComponent {
                 RESOURCE_CLEANUP_EXECUTOR_SIZE,
                 NamedThreadFactory.create(nodeName, 
"resource-cleanup-executor", LOG)
         );
-        this.finishedReadOnlyTransactionTracker = new 
FinishedReadOnlyTransactionTracker(topologyService, messagingService);
+        this.finishedReadOnlyTransactionTracker = new 
FinishedReadOnlyTransactionTracker(
+                topologyService,
+                messagingService,
+                transactionInflights
+        );
         this.finishedTransactionBatchRequestHandler =
                 new FinishedTransactionBatchRequestHandler(messagingService, 
resourceRegistry, resourceCleanupExecutor);
     }
@@ -120,7 +126,12 @@ public class ResourceCleanupManager implements 
IgniteComponent {
         shutdownAndAwaitTermination(resourceCleanupExecutor, 10, 
TimeUnit.SECONDS);
     }
 
-    public void onTransactionFinished(UUID id) {
+    /**
+     * Is called on the finish of read only transaction.
+     *
+     * @param id Transaction id.
+     */
+    void onReadOnlyTransactionFinished(UUID id) {
         finishedReadOnlyTransactionTracker.onTransactionFinished(id);
     }
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
new file mode 100644
index 0000000000..b303532fe9
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_PRIMARY_REPLICA_EXPIRED_ERR;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeException;
+import org.apache.ignite.internal.tx.TransactionResult;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Contains counters for in-flight requests of the transactions. Read-write 
transactions can't finish when some requests are in-flight.
+ * Read-only transactions can't be included into {@link 
org.apache.ignite.internal.tx.message.FinishedTransactionsBatchMessage} when
+ * some requests are in-flight.
+ */
+public class TransactionInflights {
+    /** Hint for maximum concurrent txns. */
+    private static final int MAX_CONCURRENT_TXNS = 1024;
+
+    /** Txn contexts. */
+    private final ConcurrentHashMap<UUID, TxContext> txCtxMap = new 
ConcurrentHashMap<>(MAX_CONCURRENT_TXNS);
+
+    private final PlacementDriver placementDriver;
+
+    public TransactionInflights(PlacementDriver placementDriver) {
+        this.placementDriver = placementDriver;
+    }
+
+    /**
+     * Registers the inflight update for a transaction.
+     *
+     * @param txId The transaction id.
+     * @param readOnly Whether the transaction is read-only.
+     * @return {@code True} if the inflight was registered. The update must be 
failed on false.
+     */
+    public boolean addInflight(UUID txId, boolean readOnly) {
+        boolean[] res = {true};
+
+        txCtxMap.compute(txId, (uuid, ctx) -> {
+            if (ctx == null) {
+                ctx = readOnly ? new ReadOnlyTxContext() : new 
ReadWriteTxContext(placementDriver);
+            }
+
+            res[0] = ctx.addInflight();
+
+            return ctx;
+        });
+
+        return res[0];
+    }
+
+    /**
+     * Unregisters the inflight for a transaction.
+     *
+     * @param txId The transaction id.
+     */
+    public void removeInflight(UUID txId) {
+        TxContext tuple = txCtxMap.compute(txId, (uuid, ctx) -> {
+            assert ctx != null : format("No tx context found on removing 
inflight [txId={}]", txId);
+
+            ctx.removeInflight(txId);
+
+            return ctx;
+        });
+
+        // Avoid completion under lock.
+        tuple.onInflightsRemoved();
+    }
+
+    Collection<UUID> finishedReadOnlyTransactions() {
+        return txCtxMap.entrySet().stream()
+                .filter(e -> e.getValue() instanceof ReadOnlyTxContext && 
e.getValue().isReadyToFinish())
+                .map(Entry::getKey)
+                .collect(toSet());
+    }
+
+    void removeTxContexts(Collection<UUID> txIds) {
+        txCtxMap.keySet().removeAll(txIds);
+    }
+
+    void cancelWaitingInflights(TablePartitionId groupId) {
+        for (Map.Entry<UUID, TxContext> ctxEntry : txCtxMap.entrySet()) {
+            if (ctxEntry.getValue() instanceof ReadWriteTxContext) {
+                ReadWriteTxContext txContext = (ReadWriteTxContext) 
ctxEntry.getValue();
+
+                if (txContext.isTxFinishing()) {
+                    IgniteBiTuple<ClusterNode, Long> nodeAndToken = 
txContext.enlistedGroups.get(groupId);
+
+                    if (nodeAndToken != null) {
+                        txContext.cancelWaitingInflights(groupId, 
nodeAndToken.get2());
+                    }
+                }
+            }
+        }
+    }
+
+    void markReadOnlyTxFinished(UUID txId) {
+        txCtxMap.compute(txId, (k, ctx) -> {
+            if (ctx == null) {
+                ctx = new ReadOnlyTxContext();
+            }
+
+            ctx.finishTx(null);
+
+            return ctx;
+        });
+    }
+
+    ReadWriteTxContext lockTxForNewUpdates(UUID txId, Map<TablePartitionId, 
IgniteBiTuple<ClusterNode, Long>> enlistedGroups) {
+        return (ReadWriteTxContext) txCtxMap.compute(txId, (uuid, tuple0) -> {
+            if (tuple0 == null) {
+                tuple0 = new ReadWriteTxContext(placementDriver); // No writes 
enlisted.
+            }
+
+            assert !tuple0.isTxFinishing() : "Transaction is already finished 
[id=" + uuid + "].";
+
+            tuple0.finishTx(enlistedGroups);
+
+            return tuple0;
+        });
+    }
+
+    abstract static class TxContext {
+        volatile long inflights = 0; // Updated under lock.
+
+        boolean addInflight() {
+            if (isTxFinishing()) {
+                return false;
+            } else {
+                // noinspection NonAtomicOperationOnVolatileField
+                inflights++;
+                return true;
+            }
+        }
+
+        void removeInflight(UUID txId) {
+            assert inflights > 0 : format("No inflights, cannot remove any 
[txId={}, ctx={}]", txId, this);
+
+            // noinspection NonAtomicOperationOnVolatileField
+            inflights--;
+        }
+
+        abstract void onInflightsRemoved();
+
+        abstract void finishTx(@Nullable Map<TablePartitionId, 
IgniteBiTuple<ClusterNode, Long>> enlistedGroups);
+
+        abstract boolean isTxFinishing();
+
+        abstract boolean isReadyToFinish();
+    }
+
+    private static class ReadOnlyTxContext extends TxContext {
+        private volatile boolean markedFinished;
+
+        @Override
+        public void onInflightsRemoved() {
+            // No-op.
+        }
+
+        @Override
+        public void finishTx(@Nullable Map<TablePartitionId, 
IgniteBiTuple<ClusterNode, Long>> enlistedGroups) {
+            markedFinished = true;
+        }
+
+        @Override
+        public boolean isTxFinishing() {
+            return markedFinished;
+        }
+
+        @Override
+        public boolean isReadyToFinish() {
+            return markedFinished && inflights == 0;
+        }
+
+        @Override
+        public String toString() {
+            return "ReadOnlyTxContext [inflights=" + inflights + ']';
+        }
+    }
+
+    static class ReadWriteTxContext extends TxContext {
+        private final CompletableFuture<Void> waitRepFut = new 
CompletableFuture<>();
+        private final PlacementDriver placementDriver;
+        private volatile CompletableFuture<Void> finishInProgressFuture = null;
+        private volatile Map<TablePartitionId, IgniteBiTuple<ClusterNode, 
Long>> enlistedGroups;
+
+        private ReadWriteTxContext(PlacementDriver placementDriver) {
+            this.placementDriver = placementDriver;
+        }
+
+        CompletableFuture<Void> performFinish(boolean commit, 
Function<Boolean, CompletableFuture<Void>> finishAction) {
+            waitReadyToFinish(commit)
+                    .whenComplete((ignoredReadyToFinish, readyException) -> 
finishAction.apply(commit && readyException == null)
+                            .whenComplete((ignoredFinishActionResult, 
finishException) ->
+                                    completeFinishInProgressFuture(commit, 
readyException, finishException))
+                    );
+
+            return finishInProgressFuture;
+        }
+
+        private void completeFinishInProgressFuture(
+                boolean commit,
+                @Nullable Throwable readyToFinishException,
+                @Nullable Throwable finishException
+        ) {
+            if (readyToFinishException == null) {
+                if (finishException == null) {
+                    finishInProgressFuture.complete(null);
+                } else {
+                    
finishInProgressFuture.completeExceptionally(finishException);
+                }
+            } else {
+                if (commit && readyToFinishException instanceof 
PrimaryReplicaExpiredException) {
+                    finishInProgressFuture.completeExceptionally(new 
MismatchingTransactionOutcomeException(
+                            TX_PRIMARY_REPLICA_EXPIRED_ERR,
+                            "Failed to commit the transaction.",
+                            new TransactionResult(ABORTED, null),
+                            readyToFinishException
+                    ));
+                } else {
+                    
finishInProgressFuture.completeExceptionally(readyToFinishException);
+                }
+            }
+        }
+
+        private CompletableFuture<Void> waitReadyToFinish(boolean commit) {
+            if (commit) {
+                for (Map.Entry<TablePartitionId, IgniteBiTuple<ClusterNode, 
Long>> e : enlistedGroups.entrySet()) {
+                    ReplicaMeta replicaMeta = 
placementDriver.currentLease(e.getKey());
+
+                    Long enlistmentConsistencyToken = e.getValue().get2();
+
+                    if (replicaMeta == null || 
!enlistmentConsistencyToken.equals(replicaMeta.getStartTime().longValue())) {
+                        return failedFuture(new 
PrimaryReplicaExpiredException(e.getKey(), enlistmentConsistencyToken, null, 
replicaMeta));
+                    }
+                }
+
+                return waitNoInflights();
+            } else {
+                return nullCompletedFuture();
+            }
+        }
+
+        private CompletableFuture<Void> waitNoInflights() {
+            if (inflights == 0) {
+                waitRepFut.complete(null);
+            }
+            return waitRepFut;
+        }
+
+        void cancelWaitingInflights(TablePartitionId groupId, Long 
enlistmentConsistencyToken) {
+            waitRepFut.completeExceptionally(new 
PrimaryReplicaExpiredException(groupId, enlistmentConsistencyToken, null, 
null));
+        }
+
+        @Override
+        public void onInflightsRemoved() {
+            if (inflights == 0 && finishInProgressFuture != null) {
+                waitRepFut.complete(null);
+            }
+        }
+
+        @Override
+        public void finishTx(Map<TablePartitionId, IgniteBiTuple<ClusterNode, 
Long>> enlistedGroups) {
+            this.enlistedGroups = enlistedGroups;
+            finishInProgressFuture = new CompletableFuture<>();
+        }
+
+        @Override
+        public boolean isTxFinishing() {
+            return finishInProgressFuture != null;
+        }
+
+        @Override
+        public boolean isReadyToFinish() {
+            return waitRepFut.isDone();
+        }
+
+        @Override
+        public String toString() {
+            return "ReadWriteTxContext [inflights=" + inflights + ", 
waitRepFut=" + waitRepFut
+                    + ", finishFut=" + finishInProgressFuture + ']';
+        }
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index e93463fc80..2f0c40e4de 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -22,6 +22,7 @@ import static 
java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.CompletableFuture.runAsync;
 import static java.util.concurrent.CompletableFuture.supplyAsync;
 import static java.util.function.Function.identity;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
 import static org.apache.ignite.internal.tx.TransactionIds.beginTimestamp;
@@ -34,7 +35,6 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
-import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_PRIMARY_REPLICA_EXPIRED_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR;
 
 import java.io.IOException;
@@ -45,7 +45,6 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executor;
@@ -66,7 +65,6 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.lang.IgniteInternalException;
-import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.ClusterService;
@@ -74,7 +72,6 @@ import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.NetworkMessageHandler;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
-import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
 import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import org.apache.ignite.internal.replicator.ReplicaService;
@@ -100,6 +97,7 @@ import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.tx.TxStateMetaFinishing;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
+import 
org.apache.ignite.internal.tx.impl.TransactionInflights.ReadWriteTxContext;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -117,9 +115,6 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
     /** The logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(TxManagerImpl.class);
 
-    /** Hint for maximum concurrent txns. */
-    private static final int MAX_CONCURRENT_TXNS = 1024;
-
     /** Transaction configuration. */
     private final TransactionConfiguration txConfig;
 
@@ -138,9 +133,6 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
     /** The local state storage. */
     private final VolatileTxStateMetaStorage txStateVolatileStorage = new 
VolatileTxStateMetaStorage();
 
-    /** Txn contexts. */
-    private final ConcurrentHashMap<UUID, TxContext> txCtxMap = new 
ConcurrentHashMap<>(MAX_CONCURRENT_TXNS);
-
     /** Future of a read-only transaction by it {@link TxIdAndTimestamp}. */
     private final ConcurrentNavigableMap<TxIdAndTimestamp, 
CompletableFuture<Void>> readOnlyTxFutureById = new ConcurrentSkipListMap<>(
             
Comparator.comparing(TxIdAndTimestamp::getReadTimestamp).thenComparing(TxIdAndTimestamp::getTxId)
@@ -208,6 +200,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
     /** Cleanup manager for tx resources. */
     private final ResourceCleanupManager resourceCleanupManager;
 
+    private final TransactionInflights transactionInflights;
+
     /**
      * Test-only constructor.
      *
@@ -221,6 +215,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
      * @param idleSafeTimePropagationPeriodMsSupplier Used to get idle safe 
time propagation period in ms.
      * @param localRwTxCounter Counter of read-write transactions that were 
created and completed locally on the node.
      * @param resourcesRegistry Resources registry.
+     * @param transactionInflights Transaction inflights.
      */
     @TestOnly
     public TxManagerImpl(
@@ -234,7 +229,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
             LongSupplier idleSafeTimePropagationPeriodMsSupplier,
             LocalRwTxCounter localRwTxCounter,
             RemotelyTriggeredResourceRegistry resourcesRegistry,
-            ResourceCleanupManager resourceCleanupManager
+            ResourceCleanupManager resourceCleanupManager,
+            TransactionInflights transactionInflights
     ) {
         this(
                 clusterService.nodeName(),
@@ -250,7 +246,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
                 localRwTxCounter,
                 ForkJoinPool.commonPool(),
                 resourcesRegistry,
-                resourceCleanupManager
+                resourceCleanupManager,
+                transactionInflights
         );
     }
 
@@ -269,6 +266,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
      * @param localRwTxCounter Counter of read-write transactions that were 
created and completed locally on the node.
      * @param partitionOperationsExecutor Executor on which partition 
operations will be executed, if needed.
      * @param resourcesRegistry Resources registry.
+     * @param transactionInflights Transaction inflights.
      */
     public TxManagerImpl(
             String nodeName,
@@ -284,7 +282,8 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
             LocalRwTxCounter localRwTxCounter,
             Executor partitionOperationsExecutor,
             RemotelyTriggeredResourceRegistry resourcesRegistry,
-            ResourceCleanupManager resourceCleanupManager
+            ResourceCleanupManager resourceCleanupManager,
+            TransactionInflights transactionInflights
     ) {
         this.txConfig = txConfig;
         this.lockManager = lockManager;
@@ -298,6 +297,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         this.localRwTxCounter = localRwTxCounter;
         this.partitionOperationsExecutor = partitionOperationsExecutor;
         this.resourceCleanupManager = resourceCleanupManager;
+        this.transactionInflights = transactionInflights;
 
         placementDriverHelper = new PlacementDriverHelper(placementDriver, 
clock);
 
@@ -337,17 +337,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
 
             TablePartitionId groupId = (TablePartitionId) 
eventParameters.groupId();
 
-            for (Map.Entry<UUID, TxContext> ctxEntry : txCtxMap.entrySet()) {
-                TxContext txContext = ctxEntry.getValue();
-
-                if (txContext.isTxFinishing()) {
-                    IgniteBiTuple<ClusterNode, Long> nodeAndToken = 
txContext.enlistedGroups.get(groupId);
-
-                    if (nodeAndToken != null) {
-                        txContext.cancelWaitingInflights(groupId, 
nodeAndToken.get2());
-                    }
-                }
-            }
+            transactionInflights.cancelWaitingInflights(groupId);
 
             return falseCompletedFuture();
         });
@@ -409,7 +399,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
             );
         }
 
-        return new ReadOnlyTransactionImpl(this, timestampTracker, txId, 
localNodeId, readTimestamp, resourceCleanupManager);
+        return new ReadOnlyTransactionImpl(this, timestampTracker, txId, 
localNodeId, readTimestamp);
     }
 
     /**
@@ -511,7 +501,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
             }
         }
 
-        TxContext txContext = lockTxForNewUpdates(txId, enlistedGroups);
+        ReadWriteTxContext txContext = 
transactionInflights.lockTxForNewUpdates(txId, enlistedGroups);
 
         // Wait for commit acks first, then proceed with the finish request.
         return txContext.performFinish(commitIntent, commit ->
@@ -530,20 +520,6 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         });
     }
 
-    private TxContext lockTxForNewUpdates(UUID txId, Map<TablePartitionId, 
IgniteBiTuple<ClusterNode, Long>> enlistedGroups) {
-        return txCtxMap.compute(txId, (uuid, tuple0) -> {
-            if (tuple0 == null) {
-                tuple0 = new TxContext(placementDriver); // No writes enlisted.
-            }
-
-            assert !tuple0.isTxFinishing() : "Transaction is already finished 
[id=" + uuid + "].";
-
-            tuple0.finishTx(enlistedGroups);
-
-            return tuple0;
-        });
-    }
-
     private static CompletableFuture<Void> checkTxOutcome(boolean commit, UUID 
txId, TransactionMeta stateMeta) {
         if ((stateMeta.txState() == COMMITTED) == commit) {
             return nullCompletedFuture();
@@ -810,6 +786,10 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
 
         readOnlyTxFuture.complete(null);
 
+        UUID txId = txIdAndTimestamp.getTxId();
+
+        resourceCleanupManager.onReadOnlyTransactionFinished(txId);
+
         return readOnlyTxFuture;
     }
 
@@ -833,44 +813,6 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         return allOf(readOnlyTxFutures.toArray(CompletableFuture[]::new));
     }
 
-    @Override
-    public boolean addInflight(UUID txId) {
-        boolean[] res = {true};
-
-        txCtxMap.compute(txId, (uuid, tuple) -> {
-            if (tuple == null) {
-                tuple = new TxContext(placementDriver);
-            }
-
-            if (tuple.isTxFinishing()) {
-                res[0] = false;
-                return tuple;
-            } else {
-                // noinspection NonAtomicOperationOnVolatileField
-                tuple.inflights++;
-            }
-
-            return tuple;
-        });
-
-        return res[0];
-    }
-
-    @Override
-    public void removeInflight(UUID txId) {
-        TxContext tuple = txCtxMap.compute(txId, (uuid, ctx) -> {
-            assert ctx != null && ctx.inflights > 0 : ctx;
-
-            // noinspection NonAtomicOperationOnVolatileField
-            ctx.inflights--;
-
-            return ctx;
-        });
-
-        // Avoid completion under lock.
-        tuple.onRemovedInflights();
-    }
-
     @Override
     public HybridClock clock() {
         return clock;
@@ -893,7 +835,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         Object result = response.result();
 
         if (result instanceof UUID) {
-            removeInflight((UUID) result);
+            transactionInflights.removeInflight((UUID) result);
         }
     }
 
@@ -929,7 +871,7 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
                             );
                         } else {
                             assert 
commitTimestamp.compareTo(currentPrimaryReplica.getExpirationTime()) <= 0 :
-                                    IgniteStringFormatter.format(
+                                    format(
                                             "Commit timestamp is greater than 
primary replica expiration timestamp:"
                                                     + " [groupId = {}, commit 
timestamp = {}, primary replica expiration timestamp = {}]",
                                             groupId, commitTimestamp, 
currentPrimaryReplica.getExpirationTime());
@@ -940,102 +882,6 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         return allOf(verificationFutures);
     }
 
-    private static class TxContext {
-        volatile long inflights = 0; // Updated under lock.
-        private final CompletableFuture<Void> waitRepFut = new 
CompletableFuture<>();
-        private final PlacementDriver placementDriver;
-        volatile CompletableFuture<Void> finishInProgressFuture = null;
-        volatile Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> 
enlistedGroups;
-
-        private TxContext(PlacementDriver placementDriver) {
-            this.placementDriver = placementDriver;
-        }
-
-        CompletableFuture<Void> performFinish(boolean commit, 
Function<Boolean, CompletableFuture<Void>> finishAction) {
-            waitReadyToFinish(commit)
-                    .whenComplete((ignoredReadyToFinish, readyException) -> 
finishAction.apply(commit && readyException == null)
-                            .whenComplete((ignoredFinishActionResult, 
finishException) ->
-                                    completeFinishInProgressFuture(commit, 
readyException, finishException))
-                    );
-
-            return finishInProgressFuture;
-        }
-
-        private void completeFinishInProgressFuture(
-                boolean commit,
-                @Nullable Throwable readyToFinishException,
-                @Nullable Throwable finishException
-        ) {
-            if (readyToFinishException == null) {
-                if (finishException == null) {
-                    finishInProgressFuture.complete(null);
-                } else {
-                    
finishInProgressFuture.completeExceptionally(finishException);
-                }
-            } else {
-                if (commit && readyToFinishException instanceof 
PrimaryReplicaExpiredException) {
-                    finishInProgressFuture.completeExceptionally(new 
MismatchingTransactionOutcomeException(
-                            TX_PRIMARY_REPLICA_EXPIRED_ERR,
-                            "Failed to commit the transaction.",
-                            new TransactionResult(ABORTED, null),
-                            readyToFinishException
-                    ));
-                } else {
-                    
finishInProgressFuture.completeExceptionally(readyToFinishException);
-                }
-            }
-        }
-
-        private CompletableFuture<Void> waitReadyToFinish(boolean commit) {
-            if (commit) {
-                for (Map.Entry<TablePartitionId, IgniteBiTuple<ClusterNode, 
Long>> e : enlistedGroups.entrySet()) {
-                    ReplicaMeta replicaMeta = 
placementDriver.currentLease(e.getKey());
-
-                    Long enlistmentConsistencyToken = e.getValue().get2();
-
-                    if (replicaMeta == null || 
!enlistmentConsistencyToken.equals(replicaMeta.getStartTime().longValue())) {
-                        return failedFuture(new 
PrimaryReplicaExpiredException(e.getKey(), enlistmentConsistencyToken, null, 
replicaMeta));
-                    }
-                }
-
-                return waitNoInflights();
-            } else {
-                return nullCompletedFuture();
-            }
-        }
-
-        private CompletableFuture<Void> waitNoInflights() {
-            if (inflights == 0) {
-                waitRepFut.complete(null);
-            }
-            return waitRepFut;
-        }
-
-        private void cancelWaitingInflights(TablePartitionId groupId, Long 
enlistmentConsistencyToken) {
-            waitRepFut.completeExceptionally(new 
PrimaryReplicaExpiredException(groupId, enlistmentConsistencyToken, null, 
null));
-        }
-
-        void onRemovedInflights() {
-            if (inflights == 0 && finishInProgressFuture != null) {
-                waitRepFut.complete(null);
-            }
-        }
-
-        void finishTx(Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> 
enlistedGroups) {
-            this.enlistedGroups = enlistedGroups;
-            finishInProgressFuture = new CompletableFuture<>();
-        }
-
-        boolean isTxFinishing() {
-            return finishInProgressFuture != null;
-        }
-
-        @Override
-        public String toString() {
-            return "TxContext [inflights=" + inflights + ", waitRepFut=" + 
waitRepFut + ", finishFut=" + finishInProgressFuture + ']';
-        }
-    }
-
     static class TransactionFailureHandler {
         private static final Set<Class<? extends Throwable>> RECOVERABLE = 
Set.of(
                 TimeoutException.class,
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 6383acadcf..538c432335 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -78,6 +78,7 @@ import 
org.apache.ignite.internal.tx.impl.PrimaryReplicaExpiredException;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.ResourceCleanupManager;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
@@ -143,11 +144,14 @@ public class TxManagerTest extends IgniteAbstractTest {
 
         RemotelyTriggeredResourceRegistry resourceRegistry = new 
RemotelyTriggeredResourceRegistry();
 
+        TransactionInflights transactionInflights = new 
TransactionInflights(placementDriver);
+
         ResourceCleanupManager cleanupManager = new ResourceCleanupManager(
                 LOCAL_NODE.name(),
                 resourceRegistry,
                 clusterService.topologyService(),
-                clusterService.messagingService()
+                clusterService.messagingService(),
+                transactionInflights
         );
 
         txManager = new TxManagerImpl(
@@ -161,7 +165,8 @@ public class TxManagerTest extends IgniteAbstractTest {
                 idleSafeTimePropagationPeriodMsSupplier,
                 localRwTxCounter,
                 resourceRegistry,
-                cleanupManager
+                cleanupManager,
+                transactionInflights
         );
 
         txManager.start();
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
index 0b97d676bd..5850292bfc 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
@@ -36,9 +36,6 @@ class ReadOnlyTransactionImplTest extends 
BaseIgniteAbstractTest {
     @Mock
     private TxManagerImpl txManager;
 
-    @Mock
-    private ResourceCleanupManager resourceCleanupManager;
-
     @Test
     void effectiveSchemaTimestampIsReadTimestamp() {
         HybridTimestamp readTimestamp = new HybridClockImpl().now();
@@ -49,8 +46,7 @@ class ReadOnlyTransactionImplTest extends 
BaseIgniteAbstractTest {
                 new HybridTimestampTracker(),
                 txId,
                 "localId",
-                readTimestamp,
-                resourceCleanupManager
+                readTimestamp
         );
 
         assertThat(tx.startTimestamp(), is(readTimestamp));

Reply via email to